In [75]:
!pip install anytree



In [76]:
import numpy as np
import pandas as pd

from typing import List, Dict
from anytree import NodeMixin, RenderTree , LevelOrderIter
from anytree.search import find_by_attr, findall_by_attr
from math import log2

pd.set_option('display.max_columns', 20)

### Loading Data

In [77]:
data  = pd.read_csv("data.csv")
data

Unnamed: 0,outlook,temp,humidity,windy,play
0,sunny,hot,high,False,no
1,sunny,hot,high,True,no
2,overcast,hot,high,False,yes
3,rainy,mild,high,False,yes
4,rainy,cool,normal,False,yes
5,rainy,cool,normal,True,no
6,overcast,cool,normal,True,yes
7,sunny,mild,high,False,no
8,sunny,cool,normal,False,yes
9,rainy,mild,normal,False,yes


### Implementing Decision Tree 

In [78]:
from __future__ import annotations
class DecisionTreeNode(NodeMixin):
    """
    Represents a Node of Decision Tree.
    """
    def __init__(self, name: str, attribute: str, parent: DecisionTreeNode = None):
        """
        Args:
            name(str): Tree Node name.
            attribute:(str): Node attribute/feature/column_name
            parent(DecisionTreeNode): Default->None. Node parent reference.
        """
        super(DecisionTreeNode, self).__init__()
        self.name = name
        self._attribute = attribute
        self.parent = parent
        self._inf_gain = 0.0
        self.attr_value = "{0}={1}".format(attribute, name)
    
    @property
    def attribute(self):
        return self._attribute
    @attribute.setter
    def attribute(self, label):
        self._attribute = label
    @property
    def inf_gain(self):
        return self._inf_gain
    @inf_gain.setter
    def inf_gain(self,entr):
        self._inf_gain = entr
    
from functools import reduce 
import operator

class DecisionTree():
    """
    A simple Implementation of decision tree using ID3 recursive algorithm.
    """
    def __init__(self, data: pd.DataFrame, target_attribute: str):
        """
        Args:
            data(pd.DataFrame): training data.
            target_attribute(str): lan
        """
        self.data = data
        self.target_attr = target_attribute
        self.target_attr_vals = data[target_attribute].unique()
        self.root_node = None
        
    def pmf_target(self, df: pd.DataFrame,attr:str = None) -> Dict[str, float]:
        """
        Takes a dataset as input (pandas DataFrame) and returns a dictionary containing
        the target attribute values as keys and their normalized counts as values. 
        Args:
            df(pd.DataFrame): Dataset stored in DataFrame.
        Returns:
            (dict)
        """
        if isinstance(df,pd.DataFrame):
            try:
                if attr == None:
                    pmf = (df[self.target_attr].value_counts())/df.shape[0]
                else:
                    pmf = (df.value_counts())/df.shape[0]
            except:
                raise Exception('Key Error')
            return (pmf).to_dict()
        else:
            raise Exception(f'Expected type pd.DataFrame got->{type(df)}')
        
    
    def entropy(self, pmf: Dict[str, float]) -> float:
        """
        Calculates the entropy given the classes and there respective probabilty in the dataset.
        Supports pandas Series as argument.
        Args:
            pmf(dict): Dictionary containing classes and there respective probabilities.
        Return:
            (float)
        """
        if isinstance(pmf,pd.core.series.Series):
            pmf = pmf.to_dict()
            
        if isinstance(pmf,dict):
            #-pi*log2(pi)
            entr = [-x*log2(x) if x!=0 else 0  for x in pmf.values()]
            #-Σpi*log2(pi)
            entr = reduce(operator.__add__, entr) 
            return entr
        else:
            raise Exception(f'Expected type dict got->{type(pmf)}')
        
    def cal_entropy_df(self, df: pd.DataFrame,attr:str=None) -> float:
        """
         Take a DataFrame as input and use the pmf_target and entropy 
         methods to compute the entropy.
         Args:
             df(pd.DataFrame): DataFrame containing dataset
         Return:
             (float): Entropy
        """
        if isinstance(df,pd.Series):
            df=df.to_frame()
        if isinstance(df,pd.DataFrame):
            if hasattr(self,'pmf_target'):
                pmf = self.pmf_target(df,attr)
                if hasattr(self,'entropy'):
                    entr = self.entropy(pmf)
                    return entr
                else:
                    raise Exception(f"Method Implementation 'entropy' not found in class->{self.__name__}")
            else:
                raise Exception(f"Method Implementation 'pmf_target' not found in class->{self.__name__}")
        else:
            raise Exception(f'Expected pd.DataFrame, got->{type(df)}')
    
    def info_gain_attribute(self, df: pd.DataFrame, attribute: str) -> float:
        """
        Take a DataFrame and attribute/column name as input. It will compute the information gain
        value achieved when splitting is done at that column.
        Args:
            df(pd.DataFrame) : 
            attribute(str) :
            node(DecisionTreeNode) : Default=None, parent Node to calculate information gain.
        Returns:
            (float) : information gain of split.
        """
    
        if isinstance(df,pd.DataFrame):
            if hasattr(self,'cal_entropy_df'):
                #for attributes in each split,e.g outlook has sunny,overcast,rainy,so select them each calculate weighted entropy.
                entropies = [self.cal_entropy_df(df[df[attribute] == attr][self.target_attr]) for attr in df[attribute].unique()]
                #weigths for respective sub-category. #calculate weighted average ni/N
                weights = [df[df[attribute] == attr][self.target_attr].size/df[attribute].size for attr in df[attribute].unique()]
                #multiply weight*entropy(splits).
                weigthed_sum_entropy = [e*w for e,w in zip(entropies,weights)]
                #Σweight*entropy(splits)
                entr = reduce(operator.__add__, weigthed_sum_entropy)
                #information gain = entropy(parent) - Σweight*entropy(split) 
                information_gain = self.cal_entropy_df(df[self.target_attr]) - entr
            else:
                raise Exception(f"Method Implementation 'cal_entropy_df' not found in class->{self.__name__}")
            return information_gain
        else:
            raise Exception(f'Expected pd.DataFrame, got->{type(df)}')
    
    def max_info_gain_attribute(self, df: pd.DataFrame, attributes: List[str]) -> str:  
        """
        Take a DataFrame and a list of attribute names as input and will return 
        the attribute with the highest information gain.
        Args:
            df(pd.DataFrame):
            attributes(list(str))
        Returns:
            (str) : attribute with maximum information gain. 
        """
        if isinstance(df,pd.DataFrame):
            if hasattr(self,'info_gain_attribute'):
                gains = [self.info_gain_attribute(df,attr) for attr in attributes]
                max_gain = max(gains)
                max_attr = gains.index(max_gain)
                return attributes[max_attr] , max_gain
            else:
                raise Exception(f"Method Implementation 'info_gain_attribute' not found in class->{self.__name__}")
        else:
            raise Exception(f'Expected pd.DataFrame, got->{type(df)}')
    
    def build_tree_infgain(self, df: pd.DataFrame, attr_list: List[str], start_node: DecisionTreeNode):
        """
        Implements the decision tree construction logic according to the recursive ID3 algorithm.
        Args:
            df(pd.DataFrame) : DataFrame containing dataset.
            attr_list(List): Feature name list
            start_node(parent_node) : imediate parent node for each traversal.
        Returns:
            (DecisionTreeNode): 
        """
        #If all examples are positive, Return the single-node tree Root, with label = +
        #If all examples are negative, Return the single-node tree Root, with label = -
        if len(df[self.target_attr].unique()) == 1:
            #leaf node label
            label = df[self.target_attr].unique()
            #create leaf node.
            DecisionTreeNode(label[0], "class", start_node)
            return start_node
        
        if len(attr_list) == 0:
            max_attr = df[self.target_attr].value_counts().to_dict().keys()
            #leaf node label
            label = list(max_attr)[0]
            #create leaf node.
            DecisionTreeNode(label, "class", start_node)
            return start_node
        
        #First get the attribute that best classifies dataset.
        max_attr , gain = self.max_info_gain_attribute(df,attr_list)
    
        for attr in df[max_attr].unique():
            attr_node = DecisionTreeNode(attr, max_attr, start_node)
            #create subset
            subset = df[df[max_attr] == attr].drop(max_attr,axis=1)
            if len(subset.index) == 0:
                max_attr = df[self.target_attr].value_counts().to_dict().keys()
                #leaf node label
                label = list(max_attr)[0]
                #create leaf node.
                DecisionTreeNode(label, "class", start_node)
            else:
                if max_attr in attr_list:
                    #remove the attribute on which split has performed.
                    attr_list.remove(max_attr)
            self.build_tree_infgain(subset,attr_list,attr_node)
            
        return start_node
    
    def generate_tree(self):
        """
        Generate a decision tree based on the given training test.
        Args:
            (self):
        """
        attributes = self.data.columns.to_list()
        #remove labels.
        attributes.remove(self.target_attr)
        #create start Node.
        start_node = DecisionTreeNode("start", "start")
        #set root node as start node.
        self.parent_node = start_node
        #create tree.
        self.build_tree_infgain(self.data,attributes,start_node)
        self.root_node = start_node
    
    def print_tree(self):
        """
        Render the tree.
        Args:
            (self):
        """
        for pre, _, node in RenderTree(self.root_node):
            print(f"{pre} {node.attribute}={node.name} ")
            
    def tree_depth(self):
        """
        Returns tree maximum depth:
        Args:
            (self):
        """
        depth = []
        for node in LevelOrderIter(self.parent_node):
            if node.is_leaf:
                depth.append(node.depth)
        return max(depth)
    
    def predict(self, X: pd.DataFrame) -> List[str]:
        """
        Performs inference on the given test dataset. Perform LevelOrderTraversal on the tree,
        given a attribue node.
        Args
            X(pd.DataFrame) : Training data.
        Returns:
            (list(str)) : Predicted labels by model.
        """
        #get the depth of tree.
        depth = self.tree_depth() 
        predicitions = []
        #O(rows*depth)-> O(depth)=constant -> O(rows)
        for index, row in X.iterrows():
            start_node = self.parent_node
            #O(depth)
            for i in range(depth):
                nodes_dict = {node.name:node for node in LevelOrderIter(start_node, maxlevel=2)}
                i_attr = list(nodes_dict.values())[1].attribute
                #leaf node
                if i_attr == 'class':
                    predicitions.append(list(nodes_dict.keys())[1])
                    break
                else:
                    val = row[i_attr]
                #set start_node from the first level label
                start_node = nodes_dict[val]
        return predicitions
    
            

### Train Decision Tree 

In [79]:
dec_tree = DecisionTree(data, "play")
dec_tree.generate_tree()

dec_tree.print_tree()

 start=start 
├──  outlook=sunny 
│   ├──  humidity=high 
│   │   └──  class=no 
│   └──  humidity=normal 
│       └──  class=yes 
├──  outlook=overcast 
│   └──  class=yes 
└──  outlook=rainy 
    ├──  windy=False 
    │   └──  class=yes 
    └──  windy=True 
        └──  class=no 


### Perform Inference 

In [80]:
test_data=data.drop('play',axis=1)
predictions = dec_tree.predict(test_data)
acc = accuracy_score(data['play'].to_list(), predictions)
print(acc)

1.0


### Evaluate on the Cars Dataset 

In [81]:
dt_train = pd.read_csv("cars_train.csv")
dt_test = pd.read_csv("cars_test.csv")

dt_test.head()

Unnamed: 0,buying_price,maintenance_cost,num_doors,num_persons,size_luggage,safety,decision
0,vhigh,med,2,2,small,low,unacc
1,high,high,3,4,big,med,acc
2,low,med,4,2,big,low,unacc
3,vhigh,high,5more,4,small,low,unacc
4,med,med,4,2,med,high,unacc


In [84]:
# Train
dec_tree = DecisionTree(dt_train, "decision")
dec_tree.generate_tree()
dec_tree.print_tree()

 start=start 
├──  safety=high 
│   ├──  num_persons=more 
│   │   ├──  maintenance_cost=vhigh 
│   │   │   ├──  buying_price=med 
│   │   │   │   ├──  num_doors=2 
│   │   │   │   │   └──  class=unacc 
│   │   │   │   ├──  num_doors=5more 
│   │   │   │   │   └──  class=acc 
│   │   │   │   ├──  num_doors=3 
│   │   │   │   │   └──  class=acc 
│   │   │   │   └──  num_doors=4 
│   │   │   │       └──  class=acc 
│   │   │   ├──  buying_price=high 
│   │   │   │   └──  class=unacc 
│   │   │   ├──  buying_price=low 
│   │   │   │   └──  class=acc 
│   │   │   └──  buying_price=vhigh 
│   │   │       └──  class=unacc 
│   │   ├──  maintenance_cost=low 
│   │   │   ├──  size_luggage=small 
│   │   │   │   └──  class=acc 
│   │   │   ├──  size_luggage=med 
│   │   │   │   └──  class=acc 
│   │   │   └──  size_luggage=big 
│   │   │       └──  class=acc 
│   │   ├──  maintenance_cost=med 
│   │   │   └──  class=acc 
│   │   └──  maintenance_cost=high 
│   │       └──  class=acc 
│   ├──  n

In [85]:
# Test
dt_test_x = dt_test.drop(columns="decision")
dt_test_y = dt_test["decision"].to_list()

preds = dec_tree.predict(dt_test_x)

In [86]:
print(len(preds))

100


In [87]:
# Evaluate results
from sklearn.metrics import accuracy_score, confusion_matrix

acc = accuracy_score(dt_test_y, preds)
cm = confusion_matrix(dt_test_y, preds)
print(f"Accuracy: {acc}")
print(cm)

Accuracy: 0.85
[[10 14]
 [ 1 75]]
