**Training a decision tree classifier to predicate whether a mushroom is poisonous based on its feature, the data used for training is from "agaricus-lepiota" database.** 


In [1]:
from random import *
feature_list=["cap-shape","cap-surface","cap-color","bruises","odor","gill-attachment","gill-spacing","gill-size","gill-color","stalk-shape","stalk-root",
              "stalk-surface-above-ring","stalk-surface-below-ring","stalk-color-above-ring","stalk-color-below-ring","veil-type","veil-color","ring-number",
              "ring-type","spore-print-color","population","habitat"]
#Set up the list of the names of feature 
feature_dict=dict()
for feature in feature_list:
    feature_dict[feature]=[]
#Establish the dictionary to store all possible values for each feature.
file=open("agaricus-lepiota.data","r")
dataset=[]
for line in file.readlines():
    line=line.removesuffix("\n")
    example=line.split(",")
    label=example[0]
    example=example[1:]
    if "?" in example:
        continue
    for i in range(len(feature_list)):
        feature=feature_list[i]
        if example[i] not in feature_dict[feature]:
            feature_dict[feature].append(example[i])
    dataset.append((example,label))
#Read through the dataset, store any seen feature value into the dictionary. If the data entry has missing value ignore it.
print("The label of the dataset is edibility [\'e\', \'p\']")
#label with e value is edible, while label with p value is poisonous.
print("The feature of the dataset are: ")
for i in feature_dict:
    print(i, feature_dict[i])
seed(100)
testset=[]
trainset=[]
for i in dataset:
    if random()<=0.3:
        testset.append(i)
    else:
        trainset.append(i)
#Data for training and Data for testing need to be completely exclusive to each other. In this model, the training set and testing set are split randomly at 70:30 ratio.
print("-------")
print("There is {0} examples int the trainning set.".format(len(trainset)))
print("There is {0} examples int the testing set.".format(len(testset)))


The label of the dataset is edibility ['e', 'p']
The feature of the dataset are: 
cap-shape ['x', 'b', 's', 'f', 'k', 'c']
cap-surface ['s', 'y', 'f', 'g']
cap-color ['n', 'y', 'w', 'g', 'e', 'p', 'b', 'c']
bruises ['t', 'f']
odor ['p', 'a', 'l', 'n', 'f', 'c', 'm']
gill-attachment ['f', 'a']
gill-spacing ['c', 'w']
gill-size ['n', 'b']
gill-color ['k', 'n', 'g', 'p', 'w', 'h', 'u', 'r', 'y']
stalk-shape ['e', 't']
stalk-root ['e', 'c', 'b', 'r']
stalk-surface-above-ring ['s', 'f', 'k', 'y']
stalk-surface-below-ring ['s', 'f', 'y', 'k']
stalk-color-above-ring ['w', 'g', 'p', 'n', 'b', 'c', 'y']
stalk-color-below-ring ['w', 'p', 'g', 'b', 'n', 'c', 'y']
veil-type ['p']
veil-color ['w', 'y']
ring-number ['o', 't', 'n']
ring-type ['p', 'e', 'l', 'n']
spore-print-color ['k', 'n', 'u', 'h', 'r', 'w']
population ['s', 'n', 'a', 'v', 'y', 'c']
habitat ['u', 'g', 'm', 'd', 'p', 'l']
-------
There is 3985 examples int the trainning set.
There is 1659 examples int the testing set.


In [2]:
def get_feature(featture_index):
    return feature_list[featture_index]
#This is the method to get the name of the feature with its index
def get_feature_type(feature_index):
    return feature_dict[get_feature(feature_index)]
#This is the method to get all possible value of the feature with its index

The first code block processes the agaricus-lepiota.data file and seperate the examples in the dataset into the testing set that reserved to test the model and the training set reserved to train the model.

It also extract all features of the data set in `feature_list`. For each feature, it records all posible types of the feature into `feature_dict`. The second code block provide utility method to transfer feature's index in `feature_list` to the name of respective feature and to all its possible type.

In [3]:
from enum import Enum
from abc import ABC, abstractmethod
#Import abstract class and Enumeration for my decision tree model implementation
class PredictType(Enum):
    Result = 0
    Condition = 1

class PredictModel(ABC):
    def __init__(self) -> None:
        pass
    @abstractmethod
    def predict(self,features):
        raise NotImplementedError
#The supperclass of all decision tree nodes
    
class ResultPredict(PredictModel):
    def __init__(self,result) -> None:
        super().__init__()
        self.__result=result
    def predict(self, features):
        return PredictType.Result,self.__result
    def __str__(self) -> str:
        return "end("+self.__result+")"
#The leaf nodes of the decision tree, it return its predicted label value

class ConditionPredict(PredictModel):
    def __init__(self,feature_index) -> None:
        super().__init__()
        self.__feature_index=feature_index
    
    def predict(self, features):
        return PredictType.Condition,features[self.__feature_index]
    
    def __str__(self) -> str:
        return get_feature(self.__feature_index)
#The internal nodes of the decision tree, it return the index of feature that it want to split entry into.
        
class DecisionTreeNode:
    def __init__(self,name,data=None) -> None:
        self.__name=name
        self.__childs=[]
        self.__parent=None
        self.__predictModel: PredictModel = None
        self.__data=data
    
    @property
    def name(self):
        return self.__name
    @property
    def data(self):
        return self.__data
    @property
    def child(self):
        return self.__childs
    def addChild(self,node):
        self.__childs.append(node)
        node.__parent=self
    
    def setPredictModel(self,model):
        if isinstance(model,PredictModel):
            self.__predictModel=model
        return self

    def predict(self,features):
        if self.__predictModel==None:
            raise ValueError
        predict_type,predict_result=self.__predictModel.predict(features)
        if predict_type==PredictType.Result:
            return predict_result
        else:
            for child_node in self.__childs:
                if child_node.name==predict_result:
                    return child_node.predict(features)
            return "p"
                
    def print_tree(self):
        print(self)
        for i in self.__childs:
            i.print_tree()

    def __str__(self) -> str:
        if self.__parent == None:
            return self.__name+ ("" if self.__predictModel==None else "-"+str(self.__predictModel))
        else:
            return str(self.__parent)+"-"+self.__name+("" if self.__predictModel==None else "-"+str(self.__predictModel))
#This is the decision tree model, which I use node list as its data structure. Its prediction model is the node of decision tree that it represent. It would also contain the dataset it has to process to make training process much easier to implement.


This code block provides clases that support a decision tree prediction model. I choose to use linked tree as its data structure because it's easy to expand its range and propogate a change across multiple nodes.

In [4]:
def countLabel(data):
    count_e=0
    count_p=0
    for i in data:
        if i[1]=="e":
            count_e+=1
        elif i[1]=="p":
            count_p+=1
    return count_e,count_p

def calculate_entropy(data):
    count_e,count_p=countLabel(data)
    if count_e==0 or count_p==0:
        return 0
    else:
        probability_e=count_e/len(data)
        probability_p=count_p/len(data)
        return -probability_e*math.log2(probability_e)-probability_p*math.log2(probability_p)

#calculate the entropy of a set of data, the less entropy it's much easier to predict. So we want to choose the feature which split dataset according to value of it will generate several dataset that has a sum of lower entropy than before.

def getMostAccurate(data):
    count_e,count_p=countLabel(data)
    return "e" if count_e>count_p else "p"

#Get the most accurate prediction for certian data set.

def test_split_feature(data,feature_index):
    total_len=len(data)
    data=list(data)
    feature_type=get_feature_type(feature_index)
    split_set=[]
    for i in range(len(feature_type)):
        split=[]
        individual_class=feature_type[i]
        for sample in data:
            if sample[0][feature_index]==individual_class:
                split.append(sample)
        for example in split:
            data.remove(example)
        if len(split)!=0:
            split_set.append((split,individual_class))
    entropy=0
    for i in split_set:
        entropy+=len(i)/total_len*calculate_entropy(i[0])
    return entropy,split_set
#Calculate the change in entropy after splitting dataset according to a feature.

def EntropySort(elm:DecisionTreeNode):
    return calculate_entropy(elm.data)

#Sort function based the entropy of data in the DecisionTreeNode

In [5]:
import math
remaining_feature=[]
notProcessedNode=[]
#remaining_feature store all feature that not yet to be split on. while notProcessingNode stored decision tree nodes which data set has not yet be processed and decided a model

def train_model(data):
    remaining_feature.clear()
    notProcessedNode.clear()
    for i in range(22):
        remaining_feature.append(i)
    head=DecisionTreeNode("Head",data)
    notProcessedNode.append(head)
    train_find_predict_model()
    return head
def train_find_predict_model():
    decision_tree_node=notProcessedNode[0]
    data=decision_tree_node.data
    if len(data)<=3:
        decision_tree_node.setPredictModel(ResultPredict(getMostAccurate(data)))
    else:
        base_entropy=calculate_entropy(data)
        infogain=0
        best_feature=-1
        best_split_data=[]
        for i in remaining_feature:
            entropy,split_data=test_split_feature(data,i)
            if infogain < base_entropy-entropy:
                infogain=base_entropy-entropy
                best_feature=i
                best_split_data=split_data
        if best_feature==-1:
            decision_tree_node.setPredictModel(ResultPredict(getMostAccurate(data)))
        else:
            decision_tree_node.setPredictModel(ConditionPredict(best_feature))
            remaining_feature.remove(best_feature)
            for i in best_split_data:
                new_node=DecisionTreeNode(i[1],i[0])
                decision_tree_node.addChild(new_node)
                notProcessedNode.append(new_node)
    notProcessedNode.remove(decision_tree_node)
    if len(notProcessedNode)<=0:
        return
    else:
        if len(remaining_feature)<=1:
            for node in notProcessedNode:
                node.setPredictModel(ResultPredict(getMostAccurate(node.data)))
            return
        else:
            notProcessedNode.sort(key=EntropySort,reverse=True)
            train_find_predict_model()
#The training algorithm is to find the best feature to split data stored inside a node. If no feature can reduce the entropy, then the node will become a leaf that return the prediction.

This code block is the algorithm to train a decision tree model given a data set. For each split, the algorithm will look for the tree node that has the most entropy subsets. It would then use information gain criteria to choose the best feature to split its subsets. The algorithm will not choose the same feature twice to split, and will not further split a node if it sentropy can not be improved or its subset is too small.  

In [6]:
train_data=[]
i=0
seed()
while i<2000:
    train_data.append(trainset[randint(0,len(trainset)-1)])
    i+=1
#Getting training data by bootstrapping to reduce overfitting

model=train_model(train_data)
print("The trained decision model tree is:")
model.print_tree()
print("-------------")
random_test=testset[randint(0,len(testset)-1)]
print("A random sample from the test set has feature of {0}".format(random_test[0]))
print("Its label is {0}".format(random_test[1]))
print("Based on the feature, the model will predict the label is {0}".format(model.predict(random_test[0])))

The trained decision model tree is:
Head-odor
Head-odor-p-end(p)
Head-odor-a-end(e)
Head-odor-l-end(e)
Head-odor-n-spore-print-color
Head-odor-n-spore-print-color-k-end(e)
Head-odor-n-spore-print-color-n-end(e)
Head-odor-n-spore-print-color-r-end(p)
Head-odor-n-spore-print-color-w-end(e)
Head-odor-f-end(p)
Head-odor-c-end(p)
Head-odor-m-end(p)
-------------
A random sample from the test set has feature of ['f', 's', 'g', 't', 'f', 'f', 'c', 'b', 'p', 't', 'b', 's', 's', 'w', 'w', 'p', 'w', 'o', 'p', 'h', 's', 'g']
Its label is p
Based on the feature, the model will predict the label is p


This clock block prepare training samples and execute the `train_model` function. It uses a randomized seed to bootstrap training smaples to explore different possible models. The sample size is set to 2000

There is one thing particular across all decision tree models that their first split will always be "odor" and none odor (n) will be the only subset that algorithm think required further spliting.

**Modify the decision tree with limited depth specified as a parameter of the training**


In [7]:
depth=0
def train_model_depth_control(data,stopped_depth):
    global depth
    remaining_feature.clear()
    notProcessedNode.clear()
    for i in range(22):
        remaining_feature.append(i)
    depth=stopped_depth
    head=DecisionTreeNode("Head",data)
    notProcessedNode.append(head)
    train_find_predict_model_depth_control()
    return head
def train_find_predict_model_depth_control():
    global depth
    decision_tree_node=notProcessedNode[0]
    data=decision_tree_node.data
    if len(data)<=3:
        decision_tree_node.setPredictModel(ResultPredict(getMostAccurate(data)))
    else:
        base_entropy=calculate_entropy(data)
        infogain=0
        best_feature=-1
        best_split_data=[]
        for i in remaining_feature:
            entropy,split_data=test_split_feature(data,i)
            if infogain < base_entropy-entropy:
                infogain=base_entropy-entropy
                best_feature=i
                best_split_data=split_data
        if best_feature==-1:
            decision_tree_node.setPredictModel(ResultPredict(getMostAccurate(data)))
        else:
            decision_tree_node.setPredictModel(ConditionPredict(best_feature))
            remaining_feature.remove(best_feature)
            depth-=1
            for i in best_split_data:
                new_node=DecisionTreeNode(i[1],i[0])
                decision_tree_node.addChild(new_node)
                notProcessedNode.append(new_node)
    notProcessedNode.remove(decision_tree_node)
    if len(notProcessedNode)<=0:
        return
    else:
        if len(remaining_feature)<=1 or depth<=0:
            for node in notProcessedNode:
                node.setPredictModel(ResultPredict(getMostAccurate(node.data)))
            return
        else:
            notProcessedNode.sort(key=EntropySort,reverse=True)
            train_find_predict_model_depth_control()
#added global variable depth to limit the exploration of training algorithm

In [8]:
train_data=[]
i=0
seed()
while i<2000:
    train_data.append(trainset[randint(0,len(trainset)-1)])
    i+=1

model=train_model_depth_control(train_data,4)
print("The decision model tree with depth 4 is:")
model.print_tree()

The decision model tree with depth 4 is:
Head-odor
Head-odor-p-end(p)
Head-odor-a-end(e)
Head-odor-l-end(e)
Head-odor-n-veil-color
Head-odor-n-veil-color-w-gill-attachment
Head-odor-n-veil-color-w-gill-attachment-f-veil-type
Head-odor-n-veil-color-w-gill-attachment-f-veil-type-p-end(e)
Head-odor-n-veil-color-y-end(p)
Head-odor-f-end(p)
Head-odor-c-end(p)
Head-odor-m-end(p)


In [9]:
model=train_model_depth_control(train_data,3)
print("The decision model tree with depth 3 is:")
model.print_tree()

The decision model tree with depth 3 is:
Head-odor
Head-odor-p-end(p)
Head-odor-a-end(e)
Head-odor-l-end(e)
Head-odor-n-veil-color
Head-odor-n-veil-color-w-gill-attachment
Head-odor-n-veil-color-w-gill-attachment-f-end(e)
Head-odor-n-veil-color-y-end(p)
Head-odor-f-end(p)
Head-odor-c-end(p)
Head-odor-m-end(p)


In [10]:
model=train_model_depth_control(train_data,2)
print("The decision model tree with depth 2 is:")
model.print_tree()

The decision model tree with depth 2 is:
Head-odor
Head-odor-p-end(p)
Head-odor-a-end(e)
Head-odor-l-end(e)
Head-odor-n-veil-color
Head-odor-n-veil-color-w-end(e)
Head-odor-n-veil-color-y-end(p)
Head-odor-f-end(p)
Head-odor-c-end(p)
Head-odor-m-end(p)


These code blocks provdie a training algoeithm that have depth control implemented. `stopped_depth` argument will further limit the split that the algorithm will make.

The decision tree with value of `stopped_death` being 4,3,2 have their representation printed out repectively.

**Test the accuracy of the decision tree model**

In [11]:
def test_model(model:DecisionTreeNode, data):
    true_p=0
    true_n=0
    false_p=0
    false_n=0
    for i in data:
        predict=model.predict(i[0])
        if predict=="e":
            if predict==i[1]:
                true_p+=1
            else:
                false_p+=1
        elif predict=="p":
            if predict==i[1]:
                true_n+=1
            else:
                false_n+=1
        else:
            false_n+=1
    accuracy=(true_p+true_n)/(true_n+true_p+false_n+false_p)
    precision=true_p/(true_p+false_p)
    recall=true_p/(true_p+false_n)
    F1_score=2*(precision*recall)/(precision+recall)
    return true_p,true_n,false_p,false_n,accuracy,precision,recall,F1_score
#Model Testing functions,producing accuracy (ratio of the prediction is right), precision (ratio of the prediction is right when the prediction is positive), recall (ratio of the prediction is right when the label is positive) and F1-socre.

This code block provide the test procedure that take decision tree model and testing set, compare the predicted label from the model and the real label of data, and output numbers of true positive cases, true negative cases, false positive cases, false negative case, calculated model's accuracy, precision, recall and F1-score.

In [12]:
train_data=[]
i=0
seed()
while i<2000:
    train_data.append(trainset[randint(0,len(trainset)-1)])
    i+=1

model=train_model(train_data)
j=0
test_data=[]
while j<1000:
    test_data.append(testset[randint(0,len(testset)-1)])
    j+=1
#Testing set is also obtained through bootstrap
true_p,true_n,false_p,false_n,accuracy,precision,recall,F1_score=test_model(model,test_data)
print("When testing 1000 cases, the model made {0} true positive, {1} true negative, {2} false positive and {3} false negative".format(true_p,true_n,false_p,false_n))
print("In this test, the model as {0} accuracy, {1} precision and {2} recall. The F1 score evaluation of the model is {3}".format(accuracy,precision,recall,F1_score))

When testing 1000 cases, the model made 630 true positive, 369 true negative, 0 false positive and 1 false negative
In this test, the model as 0.999 accuracy, 1.0 precision and 0.9984152139461173 recall. The F1 score evaluation of the model is 0.9992069785884219


In this code block I use the non-depth-control algorithm training model to get rough estimation of the model's performance. I use randomized seed to booststrap 2000 examples of training set and 1000 examples of testing sets.

**Validate the perfromance of decision tree with Five-fold-cross-validation**

In [13]:
def five_fold_cross_validation(data):
    folds=[[],[],[],[],[]]
    data=list(data)
    while len(data)>0:
        fold_slot=list(range(5))
        while len(fold_slot)>0:
            seed()
            example=data.pop(randint(0,len(data)-1))
            slot=fold_slot.pop(randint(0,len(fold_slot)-1))
            folds[slot].append(example)
    accuracy_list=[]
    for i in range(5):
        valid_fold=folds[i]
        train_fold=[]
        for f in folds:
            if f!=valid_fold:
                train_fold+=f
        model=train_model_depth_control(train_fold,1)
        evaluation=test_model(model,valid_fold)
        accuracy_list.append(evaluation[4])
    return accuracy_list

I would use five fold cross validation as the evaluation for my decision tree algorithm

This code block provide an implementation of five fold cross validation on decision tree algorithm with `stopped_depth` being 1.

In [14]:
train_data=[]
i=0
seed()
while i<2000:
    train_data.append(trainset[randint(0,len(trainset)-1)])
    i+=1
fold_result=five_fold_cross_validation(train_data)
print("The validation results from the five fold cross validation is : {0}".format(fold_result))
print("The average validation accuracy for the model is {0}".format(sum(fold_result)/len(fold_result)))
j=0
test_data=[]
while j<1000:
    test_data.append(testset[randint(0,len(testset)-1)])
    j+=1
model=train_model_depth_control(train_data,1)
evaluation=test_model(model,test_data)
print("The test accuracy for the model is {0}, its F1-score is {1}".format(evaluation[4],evaluation[7]))

The validation results from the five fold cross validation is : [0.98, 0.9875, 0.98, 0.985, 0.9925]
The average validation accuracy for the model is 0.985
The test accuracy for the model is 0.994, its F1-score is 0.995334370139969


This code block run a five-fold cross validation on the algorithm to get the average validation accuracy, and then compare it to the test accuracy and F1-score.

The result show that they are very similar with each other, meaning the dection tree algorithm can indeed secure a 99% accuracy on the dataset.