In [1]:
import numpy as np
import pandas as pd
import seaborn as sns

## Node Class

In [2]:
class Node:
    '''
    The class Node has all the getter and setter functions that are necessary for a tree. 

    Furthermore it has additional setter and getter for the attributes, values and classification
    and a function for printing the tree. It also has a variable to indicate whether or not the 
    attribute had continous variables.

    The decision tree knows its attributes and the corresponding values in the following way:
    The parent node knows the attribute according to which the data was split.
    The children know the values they take according to the attribute of the parent node.
    '''
    
    def __init__(self, parent = None, attribute = None, classification = None, value = None, valueIsContinuous = False, target = None):
        self.children = []  
        self.parent = parent
        self.attribute = attribute
        self.classification = classification
        self.value = value
        self.valueIsContinuous = valueIsContinuous
        self.target = target

    
    # typical setter and getter for tree structure 
    def setChild(self, node):
        self.children.append(node)
        
    def setParent(self, node):
        if (self.parent is not None):
            self.parent.children.remove(self)
        self.parent = node

    def getChildren(self):
        return self.children
    
    def getParent(self):
        return self.parent
    

    # functions to check whether node is a leaf or root node
    def isLeaf(self):
        if len(self.children) > 0:
            return False
        else:
            return True

    def isRoot(self):
        if (self.parent is None):
            return True
        else:
            return False
    

    # -------------------------------------------------------------------------------------------
    # setter and getter for attributes, values and classification 

    def setAttribute(self, attribute):
        self.attribute = attribute
    
    def getAttribute(self):
        return self.attribute
    
    def setClassification(self, classification):
        self.classification = classification
    
    def getClassification(self):
        return self.classification
    
    def setValue(self,value):
        self.value = value

    def getValue(self):
        return self.value
    
    #-------------------------------------------------------------------------------------------
    # function to print the tree
    def printTree(self, level = 0):
        tab = "    "
        if level == 0:
            print("Decision tree:")
        if self.isLeaf():
            print(tab*level, "classification: ", self.target, " = " , self.classification)
        else:
            for child in self.children:
                # printing intervals
                if child.valueIsContinuous:
                    interval = "" 
                    if child.value[0] == np.NINF:
                        interval = f"smaller then {child.value[1]}"
                    elif child.value[1] == np.PINF:
                        interval = f"bigger then {child.value[0]}"
                    else:
                        interval = f"between {child.value[0]} and {child.value[1]}"
                    print(tab*level, self.attribute, ": ", interval)
                
                # printing discrete values
                else:
                    print(tab*level, self.attribute, ": ", child.value)
                
                # traverse deeper into the tree
                child.printTree(level + 1)

## Train_Data Class

In [3]:
class Train_data:
    '''
    The class Train_data has all important functions for the ID3 Algorithm with continous 
    and discrete variables:

    The functions isContinous(), getBoundaries(), setBoundaries() and replaceContinous() are 
    responsible for handeling continuous variables:
    isContinous() detects when an attribute column consists of continuous values which is 
    the case if there are more than 10 different numerical values. 
    getBoundaries() chooses the boundary tuples representing intervals according to the classifications. 
    When this returns more than 10 different intervals setBoundaries() sets 10 evenly sized 
    intervals independent of classification instead.
    With replaceContinous() the values is the continous attribute column are then replaced with 
    the boundary tuple in which they lie inbetween. 
    The fuction sortIntervals() sorts the resulting intervals.

    The functions entropy(), informationGain(), gainRatio() and chooseAttribute() are for choosing
    an attribute column with the highest gain Ratio. Gain Ratio is choosen instead of Information Gain,
    so that for many valued attributes those with few values are prefered.

    The functions classify() and id3() are for building the tree. Where classify() returns the 
    current classification of the node and id3() is the ID3 algorithm for training a decision tree.

    The class only takes a pd.Dataframe as data, string as target and a list of strings 
    as attributes. 
    Furthermore if wished the maximal recursion depth of the ID3 algorithm can be set via 
    max_recursion.
    '''
    def __init__(self, data, target, attributes, node:Node = None, recursion_depth = 0, continuous_splitting = 0.1,  max_recursion = np.PINF):
        
        self.data = data
        if not isinstance(self.data, pd.DataFrame):
            raise TypeError("Data has to be a Pandas Dataframe")
        
        self.target = target
        if not isinstance(self.target, str):
            raise TypeError("Taget has to be of type string")
        
        self.attributes = attributes
        if not isinstance(attributes, list):
            raise TypeError("Attributes have to have structure list")
            
        for attribute in self.attributes:
            if not isinstance(attribute, str):
                raise TypeError("Attributes have to be of type string")

        self.node = node
        self.continuous_splitting = continuous_splitting
        self.recursion_depth = recursion_depth
        self.max_recursion = max_recursion
    
    ######################################
    ## methods for continuous variables ##
    ######################################
    
    def isContinuous(self, values):
        # checks is variable is a continuous variable
        # (it is continuous if it has more than 10 different values and is a numericla scalar)
        if len(values) > 10:
            if isinstance(list(values)[5], int) or isinstance(list(values)[0], float):
                return True
        return False

    
    def getBoundaries(self, tColumn, aColumn):
        # by looking at the target column and the attribute column the
        # function decides on decision boundaries in a continuous varibale, where classification changes
        # aColumn -> attribute column with the continuous values
        # tColumn -> target column with the classification
        
        # 1) sort the two columns by attribute values
        columns = pd.DataFrame(data={"a":list(aColumn), "t":list(tColumn)}).sort_values(by="a")
        columns.index = range(len(columns))

        # 2) find decision boundaries where classification changes
        leftBound = np.NINF # first interval has negative infinity as left boundary
        rightBound = None
        boundaries = []
        currentClass = columns["t"][0]
        
        for i in range(len(columns)):
            
            # when classification changes
            if(columns["t"][i] != currentClass):
                currentClass = columns["t"][i]
                
                # get the value in the middel of the values where classification changes
                beforeSwitch = columns["a"][i-1]
                afterSwitch = columns["a"][i]
                rightBound = (beforeSwitch + afterSwitch) / 2

                # safe the tupple of two boundaries, 
                # represents an interval with a uniform classification
                boundaries.append((leftBound, rightBound))
                leftBound = rightBound
        
        # last interval has positive infinity as right boundary
        boundaries.append((leftBound, np.PINF))
        
        # if the getBoundaries function returns more then 10 intervals
        # set intervals independent of classification
        if len(boundaries) > 10:
            return self.setBoundaries(aColumn)
        
        return boundaries
    
    def setBoundaries(self, aColumn):
        # if the getBoundaries function returns more then 10 intervals
        # sets 10 eaqually sized intervals indipendent of classification
        
        # calculate size of intervals
        maximum = np.max(aColumn)
        minimum = np.min(aColumn)
        stepsize = (maximum - minimum)/ 10
        boundaries = []
        
        # make a tupel for each interval
        leftBound = np.NINF
        rightBound = minimum + stepsize
        for i in range(9):
            boundaries.append((leftBound, rightBound))
            leftBound = rightBound
            rightBound = leftBound + stepsize
        boundaries.append((leftBound, np.PINF))
        
        return boundaries
        
    
    def replaceContinuous(self, boundaries, aColumn):
        # replaces the continuous values of an attribute by the
        # tuples that represent an interval
        
        newAColumn = []
        for value in aColumn:
            # find the interval that includes the value
            foundInterval = False
            for l, r in boundaries:
                if value >= l and value < r:
                    newAColumn.append((l, r))
                    foundInterval = True
                    break
            if foundInterval == False:
                raise TypeError("could not find and interval for ", value)
        
        return pd.Series(newAColumn)
    

    def sortIntervals(self, unsortedV):
        # sortes the intervals according to their value
        sortedV = []
        leftBound = np.NINF
        for value in unsortedV:
            if value[0] == leftBound:
                leftBound = value[1]
                sortedV.append(value)
        return sortedV
    
    #######################################
    ## methods for choosing an attribute ##
    #######################################
    
    
    def entropy(self, targetColumn):
        values = set(targetColumn)
        entropySum = 0

        # Itterates through all values and calculates the sum of the entropies of the values
        for value in values:
            p = list(targetColumn).count(value) / len(targetColumn)
            entropySum = entropySum + (- p * np.log(p))

        return entropySum
    

    def informationGain(self, attributeColumn, values):
        # calculates the informationGain
        gainSum = 0
        for value in values:
            mask = lambda aColumn, value :(row == value for row in aColumn) 
            subsetData = self.data.iloc[mask(attributeColumn, value),:]
            subsetTargetColumn = subsetData[self.target]
            # claculate entropy and normalize by size of subsets
            gainSum = gainSum + (len(subsetData)/ len(self.data)) * self.entropy(subsetTargetColumn)

        # substract summed and weighted entropy of subsets from entropy of whole set
        infoGain = self.entropy(self.data.loc[:, self.target]) - gainSum

        return infoGain

    def gainRatio(self, attributeColumn, values):
        # calculating the Gain Ratio instead of the InforamtionGain
        # to prefer attributes with few values
        infoGain = self.informationGain(attributeColumn, values)
        
        splitInfo = 0.0
        for value in values:
            subset = attributeColumn[attributeColumn == value]
            # proportion of subset size and whole set size
            s = len(subset) / len(attributeColumn)
            if s != 0.0:
                splitInfo = splitInfo + ((- s) * np.log(s))
        
        # to avoid dividing by zero
        if splitInfo == 0:
            splitInfo = infoGain
            if infoGain == 0:
                return 0
            
        return infoGain / splitInfo
        

    def chooseAttribute(self):
        # chooses an attribute that maximises GainRatio
        maxGain= 0
        maxAttribute = ""

        # calculate Gain Ratio for each attribute
        for attribute in self.attributes:
            attributeColumn = self.data[attribute]
            values = set(attributeColumn)
            gain = 0

            # replace the continuous values in attributeColumn by Intervals
            if self.isContinuous(values):
                targetColumn = self.data[self.target]
                boundaries = self.getBoundaries(targetColumn, attributeColumn)
                attributeColumn = self.replaceContinuous(boundaries, attributeColumn)
                values = set(attributeColumn)            
                
            # calculate gainRatio
            gain = self.gainRatio(attributeColumn, values)

            # store attribute with highest information gain
            if gain >= maxGain:
                maxGain = gain
                maxAttribute = attribute

        # choose attribute with highest Information Gain
        return maxAttribute


    ############################################
    ## methods for building the decision tree ##
    ############################################
    
    def classify(self):
        # returns the most commen classification of the dataset
        targetColumn = self.data.loc[:, self.target]
        values = set(targetColumn)
        maxClass = 0  # highest number of values
        classification = "" # classification of most common value

        for value in values:
            # check if calssification value is more common then other classification values
            if list(targetColumn).count(value) > maxClass:
                maxClass = list(targetColumn).count(value)
                classification = value

        return classification
    

    def id3(self):

        # BASE CASES:

        # 1) all instances have same target value: 
        #    -> create a leaf node with target value as classification
        if (self.data[self.target].nunique() == 1):
            self.node.setClassification(self.data[self.target].iloc[0])
            return 

        # 2) out of desciptive features (list of attributes to choose is empty) 
        #    -> create a leaf node with majority of target values as classification
        if (not self.attributes):
            self.node.setClassification(self.classify())
            return

        # 3) no instances left in dataset 
        #    -> take majority of target values of the parent node as classification
        if (self.data is None):
            parent = self.node.getParent()
            self.node.setClassification(parent.getClassification())
            return

        # 4) when recursion depth is limited and limit is reached:
        #    -> no further splitting of the data, 
        #       current node is leaf node with majority of target values as classification
        if self.recursion_depth >= self.max_recursion:
            self.node.setClassification(self.classify())
            return


        # RECURSIVE CASE:

        # choose attribute with highest explainatory power
        attribute = self.chooseAttribute()

        # set the attribute and the classification of the current node
        self.node.setAttribute(attribute)
        self.node.setClassification(self.classify())

        # split data according to attribute
        attributeColumn = self.data.loc[:, attribute]
        values = set(attributeColumn)

        # make a new list of attributes without the current attribute
        new_attributes = self.attributes
        new_attributes.remove(attribute)
        
        # add 1 to the recursion_depth
        recursion_depth = self.recursion_depth + 1

        # when chosen attribute is a continuous variable:
        # replace the continous values with corresponding boundary tuples
        valueIsContinuous = False
        if self.isContinuous(values):
            targetColumn = self.data[self.target]
            boundaries = self.getBoundaries(targetColumn, attributeColumn)
            attributeColumn = self.replaceContinuous(boundaries, attributeColumn)
            values = set(attributeColumn)
            values = self.sortIntervals(values)
            valueIsContinuous = True

        
        # create leaf node for each attribute value
        for value in values:
            # get the subset determined by the attribute value
            mask = lambda aColumn, value :(row == value for row in aColumn) 
            subsetData = self.data.iloc[mask(attributeColumn, value),:]
            # create a node in the tree
            childNode = Node(parent=self.node, value=value, valueIsContinuous=valueIsContinuous, target=self.target)
            self.node.setChild(childNode)
            # train the node with the data subset
            subset = Train_data(data=subsetData, 
                                target=self.target, 
                                attributes=new_attributes, 
                                node=childNode, 
                                recursion_depth=recursion_depth, 
                                max_recursion = self.max_recursion)

            # recursive call on all partitions                    
            subset.id3() 
            

## Test_data Class

In [4]:
class Test_data:

    '''
    The class Test_data tests the accuracy of a decisiontree with the testing set of the data.

    The function classify() is a recursive function and returns the 
    decisiontree's classification of this datapoint.
    The function classifySet() returns the classification of the whole test data.
    The function accuracy() is for calculating the error rate of the decisiontree. 
    The calculated error lies inbetween 0 and 1 where 1 indicates a fully right classification 
    of the testing set and 0 indicaties a fully wrong classification.

    The class only takes the root node of a trained decisiontree for testing 
    and the test data has to be a pd.Dataframe.
    '''
    
    def __init__(self, testData:pd.DataFrame, target, node:Node):
        self.testData = testData
        self.target = target
        self.rootNode = node

        # check whether node is trained:
        if node.getAttribute() is None:
            raise TypeError("node has to be part of a trained Decisiontree")

    
    def classify(self, datapoint, node):
        
        # basecase 1: 
        # get the classification of the leaf node
        if node.isLeaf() == True:
            return node.getClassification()


        # recursive case:
        # traverse the tree according to attributes and values:
        
        # get the attribute of the current node and the attribute value of the datapoint
        attribute = node.getAttribute()
        dataValue = datapoint.loc[attribute]

        # itterate through all children of the node
        for child in node.getChildren():
            cValue = child.getValue()
            
            # for interval values
            if child.valueIsContinuous:
                # check whether datapoint lies inbetween the boundaries
                if dataValue >= cValue[0] and dataValue < cValue[1]:
                    return self.classify(datapoint, child)
            # for discrete values
            elif cValue is dataValue:
                return self.classify(datapoint, child)
        
        # basecase 2:
        # if there are no children with the right value at decision node, get current classification
        return node.getClassification()
        

    def classifySet(self):
        classes = []
        
        # itterate through the test data and classify each datapoint
        for i in range(len(self.testData)):
            datapoint = self.testData.iloc[i]
            classes.append(self.classify(datapoint, self.rootNode))
        return classes
    

    def accuracy(self):
        classes = self.classifySet()
        targets = self.testData[self.target]
        
        errors = []
        # itterates through targets and classes
        # checks whether the classification of the decisiontree was right or not
        for target, classification in zip(targets, classes):
            if target == classification:
                errors.append(1)
            else:
                errors.append(0)
                
        return np.mean(errors)


## prepare_data function

In [5]:
def prepare_data(data:pd.DataFrame, tratio = 0.1):
    
    # remove any Nans from Dataframe
    data = data.dropna(how='any')
    
    # shuffle data
    data = data.sample(frac=1, random_state=1).reset_index(drop=True)
    
    # variables to return
    testData = []
    trainingData = []

    # check whether tratio is smaller than 1
    if tratio >= 1:
        raise TypeError("tratio has to be smaller than 1")
    
    # get length of dataframe
    dataLength = data.shape[0]
    # get chunck size:
    chunkSize = int(dataLength * tratio) 
    # get number of chunks
    nr_chunks = int(1/tratio)

    # append data set to existing dataset
    doubleData = data.append(data)

    # itterate through doubleData, assigh a certain chunk to testing and training
    for chunk in range(nr_chunks-1):
        testData.append(doubleData.iloc[chunkSize*chunk: chunkSize*(chunk+1), :])
        trainingData.append(doubleData.iloc[chunkSize*(chunk+1): chunkSize*(nr_chunks+chunk), :])
    
    return [testData, trainingData]

## Random_forest Class

In [6]:
class Random_forest:
    
    '''
    The class Random_forest implements a reandom forest, made of multiple decision trees.
    Each Tree is trained on a different training set, though the training sets might share some datapoints (bag of features).
    The random forest classifies datapoints by looking at the classification from each tree 
    and then choosing the most common one.
    
    The Random forest is first trained with train() and then can be used to classify datapoints with 
    classifySet() or the accuracy can be calculated directly with accuracy()
    
    The prepare_data() function splits the data into a training and a testing set according to the testRatio and then
    samples the training chunks from the training set according to the trainRatio.
    
    The variable trainRatio specifies how large a training set is, in realtion to the wohle data set, 
    testRatio does the same for the testing set. nrTrees indicates how many decision trees are making up the forest.
    
    '''

    def __init__(self, data, target, trainRatio = 0.1, nrTrees = 10, testRatio = 0.3):

        self.data = data
        self.target = target
        self.trees = []
        self.trainRatio = trainRatio 
        self.nrTrees = nrTrees
        self.testRatio = testRatio
        self.testingSet = None

    
    def train(self):

        trainingSets = self.prepare_data()

        # train a tree for each training set
        for trainingSet in trainingSets:

            attributes = list(trainingSet.columns)
            attributes.remove(self.target)

            rootNode = Node()
            decisionTree = Train_data(data=trainingSet, target=self.target, attributes=attributes, node=rootNode, max_recursion = np.PINF)
            decisionTree.id3()
            self.trees.append(rootNode)
        
    def prepare_data(self):
    
        # check whether Ratios are acceptable
        if self.testRatio >= 1:
            raise TypeError("Testing ratio has to be smaller than 1")
            
        if self.trainRatio >= 1:
            raise TypeError("Training ratio has to be smaller than 1")
        
            
        # remove any Nans from Dataframe
        data = self.data.dropna(how='any')

        # shuffle data
        data = data.sample(frac=1, random_state=1).reset_index(drop=True)
        
        # split of the testing set
        self.testingSet = data.iloc[0:int(len(data) * self.testRatio), :]
        trainingData = data.iloc[int(len(data) * self.testRatio):, :]

        # split training Data into random sets
        trainingSize = int(self.trainRatio * len(data))
        trainingSets = []
        for tree in range(self.nrTrees):
            # suffel dataset before extracting traingset of correct size
            trainingData = trainingData.sample(frac=1, random_state=1).reset_index(drop=True) #random shuffel
            trainingSets.append(trainingData.iloc[:trainingSize, :])
        
        return trainingSets
    
    def classifySet(self, testingSet):
        
        # for each tree get classification list
        classifications = []
        for tree in self.trees:
            testData = Test_data(testingSet, self.target, tree)
            classifications.append(testData.classifySet())
        
        # for each datapoint get the most common classification
        votedClass = []
        for i in range(len(testingSet)):
            votes = []
            for classList in classifications:
                votes.append(classList[i])
            votedClass.append(max(set(votes), key = votes.count))

        return votedClass
    
    def accuracy(self):
        
        # get classification of datapoints
        votedClasses = self.classifySet(self.testingSet)
        
        # for each datapoint, check if classification is accurate
        targets = self.testingSet[self.target]
        errors = []
        for target, classification in zip(targets, votedClasses):
            if target == classification:
                errors.append(True)
            else:
                errors.append(False)

         # calculate accuracy        
        return np.mean(errors)


## main

In [7]:

# datasets and the corresponding target features for training the decision trees
seabornToyData = [("penguins", "species"), ("titanic", "survived"), ("iris", "species"), ("tips", "sex")]

for dataset,target in seabornToyData:

    # 1. load data
    data = sns.load_dataset(dataset)
    if dataset == "titanic":
        data = data.drop("alive", axis=1)

    # 2. prepare training and testing chunks
    trainingSets, testingSets = prepare_data(data, 0.1)

    # 3. train a tree for each chunk of the training set -> 10 trees
    decisionTrees = []
    for trainingSet in trainingSets:

        attributes = list(trainingSet.columns)
        attributes.remove(target)

        rootNode = Node()
        decisionTree = Train_data(data=trainingSet, target=target, attributes=attributes, node=rootNode, max_recursion = np.PINF)
        decisionTree.id3()
        decisionTrees.append(rootNode)

    # 4. test each tree with the corresponding testing chunk
    accuracies = []
    for testingSet, tree in zip(testingSets, decisionTrees):

        testData = Test_data(testingSet, target, tree)
        accuracies.append(testData.accuracy())

    # 5. print tree with best accuracy score
    maxAcc = np.max(accuracies)
    bestTree = decisionTrees[np.argmax(accuracies)]
    print("\n\nDecision tree trained on ", dataset ," Dataset for classifying ", target)
    print("accuracy score: ", maxAcc)
    print(bestTree.printTree())
    
    # 6. build a decision Forest with same training set size for each tree
    forest = Random_forest(data, target, trainRatio = 0.1, nrTrees = 15, testRatio = 0.1)
    forest.train()
    fAcc = forest.accuracy()
    
    # 7. check weather decision Forest is more accurate
    print("A random forest trained on the same dataset reaches an accuracy of ", fAcc)
    if fAcc > maxAcc:
        print("The random forest performs better then the best decision Tree")
    elif fAcc < maxAcc:
        print("The random forest performs worse then the best decision Tree")
    else:
        print("The random forest performs equal to the best decision Tree")

  doubleData = data.append(data)




Decision tree trained on  penguins  Dataset for classifying  species
accuracy score:  0.9764309764309764
Decision tree:
 island :  Dream
     bill_length_mm :  smaller then 45.05
         classification:  species  =  Adelie
     bill_length_mm :  bigger then 45.05
         classification:  species  =  Chinstrap
 island :  Biscoe
     flipper_length_mm :  smaller then 206.5
         classification:  species  =  Adelie
     flipper_length_mm :  bigger then 206.5
         classification:  species  =  Gentoo
 island :  Torgersen
     classification:  species  =  Adelie
None
A random forest trained on the same dataset reaches an accuracy of  1.0
The random forest performs better then the best decision Tree


  doubleData = data.append(data)




Decision tree trained on  titanic  Dataset for classifying  survived
accuracy score:  0.691358024691358
Decision tree:
 alone :  False
     fare :  smaller then 23.2
         classification:  survived  =  1
 alone :  True
     classification:  survived  =  0
None
A random forest trained on the same dataset reaches an accuracy of  0.7222222222222222
The random forest performs better then the best decision Tree


  doubleData = data.append(data)




Decision tree trained on  iris  Dataset for classifying  species
accuracy score:  0.9481481481481482
Decision tree:
 petal_length :  smaller then 2.45
     classification:  species  =  setosa
 petal_length :  between 2.45 and 4.9
     classification:  species  =  versicolor
 petal_length :  bigger then 4.9
     classification:  species  =  virginica
None
A random forest trained on the same dataset reaches an accuracy of  1.0
The random forest performs better then the best decision Tree


  doubleData = data.append(data)




Decision tree trained on  tips  Dataset for classifying  sex
accuracy score:  0.6435185185185185
Decision tree:
 size :  2
     total_bill :  smaller then 11.78
         classification:  sex  =  Male
 size :  3
     tip :  2.5
         classification:  sex  =  Female
     tip :  3.71
         classification:  sex  =  Male
     tip :  3.5
         classification:  sex  =  Male
     tip :  3.0
         classification:  sex  =  Female
     tip :  4.0
         classification:  sex  =  Female
 size :  4
     classification:  sex  =  Male
None
A random forest trained on the same dataset reaches an accuracy of  0.7083333333333334
The random forest performs better then the best decision Tree
