In [1]:
import pandas as pd
import math
import random
import numpy as np

In [2]:
breast_cancer = pd.read_csv("./breast-cancer_csv.csv")
breast_cancer.head(10)

Unnamed: 0,age,menopause,tumor-size,inv-nodes,node-caps,deg-malig,breast,breast-quad,irradiat,Class
0,40-49,premeno,15-19,0-2,yes,3,right,left_up,no,recurrence-events
1,50-59,ge40,15-19,0-2,no,1,right,central,no,no-recurrence-events
2,50-59,ge40,35-39,0-2,no,2,left,left_low,no,recurrence-events
3,40-49,premeno,35-39,0-2,yes,3,right,left_low,yes,no-recurrence-events
4,40-49,premeno,30-34,3-5,yes,2,left,right_up,no,recurrence-events
5,50-59,premeno,25-29,3-5,no,2,right,left_up,yes,no-recurrence-events
6,50-59,ge40,40-44,0-2,no,3,left,left_up,no,no-recurrence-events
7,40-49,premeno,10-14,0-2,no,2,left,left_up,no,no-recurrence-events
8,40-49,premeno,0-4,0-2,no,2,right,right_low,no,no-recurrence-events
9,40-49,ge40,40-44,15-17,yes,2,right,left_up,yes,no-recurrence-events


In [149]:
breast_cancer = pd.read_csv("./breast-cancer_csv.csv")
breast_cancer = breast_cancer.fillna(method='ffill')
breast_cancer=breast_cancer.drop('Class',axis=1)
#attributes and values
attributes = breast_cancer.columns
# set(breast_cancer[attributes[0]])
attributes_and_values = {}
for x in attributes:
    attributes_and_values[x] = set(breast_cancer[x])
for x in attributes_and_values:
    print(x,attributes_and_values[x])
attributes=attributes.drop('irradiat')

age {'60-69', '50-59', '30-39', '70-79', '20-29', '40-49'}
menopause {'ge40', 'lt40', 'premeno'}
tumor-size {'15-19', '50-54', '35-39', '40-44', '25-29', '0-4', '20-24', '30-34', '45-49', '5-9', '10-14'}
inv-nodes {'0-2', '9-11', '15-17', '3-5', '24-26', '12-14', '6-8'}
node-caps {'yes', 'no'}
deg-malig {1, 2, 3}
breast {'right', 'left'}
breast-quad {'left_low', 'left_up', 'right_low', 'central', 'right_up'}
irradiat {'yes', 'no'}


In [150]:
def total_entropy(data: pd.DataFrame, attribute, attribute_values = attributes_and_values):
    size = len(data)
    total_entropy = 0
    for value in attribute_values[attribute]:
        value_prob = len(data[data[attribute] == value]) / size
        total_entropy -= value_prob*np.log2(value_prob)
    return total_entropy
def calc_entropy(filtered_data:pd.DataFrame,attribute,attribute_values = attributes_and_values):
    size = len(filtered_data)
    entropy = 0
    for value in attribute_values[attribute]:
        value_count = len(filtered_data[filtered_data[attribute] == value])
        entropy_of_value = 0
        if value_count != 0 :
            prob_value = value_count / size
            entropy_of_value -= prob_value*np.log2(prob_value)
        entropy+=entropy_of_value
    return entropy

def information_gain(data:pd.DataFrame,attribute,target = 'irradiat',attribute_values=attributes_and_values):
    size = len(data)
    attribute_info = 0
    for value in attribute_values[attribute]:
        filtered_data = data[data[attribute] == value]
        value_count = len(filtered_data)
        value_entropy = calc_entropy(filtered_data,target,attribute_values)
        value_prob = value_count/size
        attribute_info += value_prob*value_entropy
    return total_entropy(data,target,attribute_values) - attribute_info

In [151]:
def most_info_gain(data:pd.DataFrame,attributes = None,target = 'irradiat',attributes_and_values = attributes_and_values):

    info_gain = {}
    for attr in attributes:
        info_gain[attr] = information_gain(data,attr,target,attributes_and_values)
    return max(info_gain,key= lambda x : info_gain[x])

algorithm code


In [153]:
class Node:
    def __init__(self) -> None:
        self.attr_value = None#value of prev_attr to get to this node
        self.prev_attr = None#what was the previous split attribute
        self.attr = None#value to split with next
        self.childs = {}#children if any
    def __str__(self,level=0) -> str:
        if self.attr:
            t = ""
        else :
            t = "target value is "
        ret = "-"*(level**2)+repr(self.prev_attr)+" "+t +repr(self.attr_value)+"\n"
        temp = self.childs.get("most common target value")
        if temp != None:
            ret+="-"*(level**2)+"most common target  value is "+repr(temp)+"\n"
        else:
            for child in self.childs:
                ret += self.childs[child].__str__(level+1)
        return ret

In [154]:
def predict(node:Node,entry):
    if len(node.childs) == 0:#if there are no childs
        return node.attr_value
    res = node.childs.get("most common target value")
    if res != None:
        return res
    value = entry[node.attr]
    return predict(node.childs[value],entry)

In [155]:
class DTClassifier:
    def __init__(self,data:pd.DataFrame,target,attributes_and_values = attributes_and_values) -> None:
        self.data = data
        self.attributes = data.columns.drop(target)
        self.target = target
        self.target_values = data[target]
        self.attributes_and_values = attributes_and_values
        self.entropy = total_entropy(self.data,self.target,attributes_and_values)
        self.node = None
    def train(self,k = 10):
        self.node = self._id3_recv(self.data,self.attributes,self.node,k)
    def _id3_recv(self,data:pd.DataFrame,attributes : pd.Index, node,k):
        if not node:
            node = Node() #init the node
        if len(data[self.target].value_counts()) == 1: #if the data is pure (all entries have the same target value)
            node.attr_value = data[self.target].iloc[0]
            return node
        if len(attributes) == 0:#if all the features are exhausted and we still have impure data
            node.attr_value = data[self.target].value_counts().idxmax()#get the most common value
            return node
        # print(attributes)
        if len(data) <= k :#if we have less than k entries, prune the tree and return most common value
            node.attr_value = data[self.target].value_counts().idxmax()
            return node
        best_next_attribute = most_info_gain(data,attributes,self.target,self.attributes_and_values)
        node.attr = best_next_attribute
        # print(attributes,best_next_attribute)
        for value in attributes_and_values[best_next_attribute]:
            child = Node()
            child.attr_value = value
            child.prev_attr = best_next_attribute
            node.childs[value] = child
            child_data = data[data[best_next_attribute] == value]
            if child_data.empty:
                child.childs["most common target value"] = data[self.target].value_counts().idxmax()
            else :
                attributes_new = attributes.drop(best_next_attribute)
                child = self._id3_recv(child_data,attributes_new,child,k)
        return node
    def test(self,test_data:pd.DataFrame):
        size = len(test_data)
        hits = 0.0
        for _,entry in test_data.iterrows():
            hits += predict(self.node,entry) == entry[self.target]
        return hits/size
            

In [159]:
from sklearn.model_selection import train_test_split
train, test = train_test_split(breast_cancer,test_size=0.3)
train = train.reset_index()
train.drop('index',axis=1,inplace=True)
test = test.reset_index()
test.drop('index',axis=1,inplace=True)
tree = DTClassifier(train,'irradiat',attributes_and_values)
tree.train()
print(tree.test(test))
print(tree.test(train))
# print(tree.node)


0.7674418604651163
0.85


In [112]:

class Node:
    """Contains the information of the node and another nodes of the Decision Tree."""

    def __init__(self):
        self.value = None
        self.next = None
        self.childs = None

In [113]:

class DecisionTreeClassifier:
    """Decision Tree Classifier using ID3 algorithm."""

    def __init__(self, X, feature_names, labels):
        self.X = X  # features or predictors
        self.feature_names = feature_names  # name of the features
        self.labels = labels  # categories
        self.labelCategories = list(set(labels))  # unique categories
        # number of instances of each category
        self.labelCategoriesCount = [list(labels).count(x) for x in self.labelCategories]
        self.node = None  # nodes
        # calculate the initial entropy of the system
        self.entropy = self._get_entropy([x for x in range(len(self.labels))])
        
    def _get_entropy(self, x_ids):
        """ Calculates the entropy.
        Parameters
        __________
        :param x_ids: list, List containing the instances ID's
        __________
        :return: entropy: float, Entropy.
        """
        # sorted labels by instance id
        labels = [self.labels[i] for i in x_ids]
        # count number of instances of each category
        label_count = [labels.count(x) for x in self.labelCategories]
        # calculate the entropy for each category and sum them
        entropy = sum([-count / len(x_ids) * math.log(count / len(x_ids), 2)
                    if count else 0
                        for count in label_count
                        ])
        
        return entropy
    def _get_information_gain(self, x_ids, feature_id):
        """Calculates the information gain for a given feature based on its entropy and the total entropy of the system.
        Parameters
        __________
        :param x_ids: list, List containing the instances ID's
        :param feature_id: int, feature ID
        __________
        :return: info_gain: float, the information gain for a given feature.
        """
        # calculate total entropy
        info_gain = self._get_entropy(x_ids)
        # store in a list all the values of the chosen feature
        x_features = [self.X.iloc[x][feature_id] for x in x_ids]
        # get unique values
        feature_vals = list(set(x_features))
        # get frequency of each value
        feature_v_count = [x_features.count(x) for x in feature_vals]
        # get the feature values ids
        feature_v_id = [
            [x_ids[i]
            for i, x in enumerate(x_features)
            if x == y]
            for y in feature_vals
        ]

        # compute the information gain with the chosen feature
        info_gain_feature = sum([v_counts / len(x_ids) * self._get_entropy(v_ids)
                            for v_counts, v_ids in zip(feature_v_count, feature_v_id)])

        info_gain = info_gain - info_gain_feature

        return info_gain
    def _get_feature_max_information_gain(self, x_ids, feature_ids):
        """Finds the attribute/feature that maximizes the information gain.
        Parameters
        __________
        :param x_ids: list, List containing the samples ID's
        :param feature_ids: list, List containing the feature ID's
        __________
        :returns: string and int, feature and feature id of the feature that maximizes the information gain
        """
        # get the entropy for each feature
        features_entropy = [self._get_information_gain(x_ids, feature_id) for feature_id in feature_ids]
        # find the feature that maximises the information gain
        max_id = feature_ids[features_entropy.index(max(features_entropy))]
        return self.feature_names[max_id], max_id
        
    def id3(self):
        """Initializes ID3 algorithm to build a Decision Tree Classifier.
        :return: None
        """
        # assign an unique number to each instance
        x_ids = [x for x in range(len(self.X))]
        # assign an unique number to each featuer
        feature_ids = [x for x in range(len(self.feature_names))]
        # define node variable - instance of the class Node
        self.node = self._id3_recv(x_ids, feature_ids, self.node)
    def _id3_recv(self, x_ids, feature_ids, node):
        """ID3 algorithm. It is called recursively until some criteria is met.
        Parameters\n
        __________
        :param x_ids: list, list containing the samples ID's
        :param feature_ids: list, List containing the feature ID's
        :param node: object, An instance of the class Nodes\n
        __________
        :returns: An instance of the class Node containing all the information of the nodes in the Decision Tree
        """
        if not node:
            node = Node()  # initialize nodes
        # sorted labels by instance id
        labels_in_features = [self.labels[x] for x in x_ids]
        # if all the example have the same class (pure node), return node
        if len(set(labels_in_features)) == 1:
            node.value = self.labels[x_ids[0]]
            return node
        # if there are not more feature to compute, return node with the most probable class
        if len(feature_ids) == 0:
            node.value = max(set(labels_in_features), key=labels_in_features.count)  # compute mode
            return node
        # else...
        # choose the feature that maximizes the information gain
        best_feature_name, best_feature_id = self._get_feature_max_information_gain(x_ids, feature_ids)
        node.value = best_feature_name
        node.childs = []
        # value of the chosen feature for each instance
        feature_values = list(set([self.X.iloc[x][best_feature_id] for x in x_ids]))
        # loop through all the values
        for value in feature_values:
            child = Node()
            child.value = value  # add a branch from the node to each feature value in our feature
            node.childs.append(child)  # append new child node to current node
            child_x_ids = [x for x in x_ids if self.X.iloc[x][best_feature_id] == value]
            if not child_x_ids:
                child.next = max(set(labels_in_features), key=labels_in_features.count)
                print('')
            else:
                if feature_ids and best_feature_id in feature_ids:
                    to_remove = feature_ids.index(best_feature_id)
                    feature_ids.pop(to_remove)
                # recursively call the algorithm
                child.next = self._id3_recv(child_x_ids, feature_ids, child.next)
        return node
                


In [133]:
def create_tree(data):
    tree = DecisionTreeClassifier(data,data.columns.drop('irradiat'),data['irradiat'])
    return tree


In [132]:
def predict(tree:Node, entry:pd.Series):
    if tree.value == 'yes':
        return 'yes'
    elif tree.value == 'no':
        return 'no'
    for child in tree.childs:
        if child.value == entry[tree.value]:
            return predict(child.next,entry)
    return tree.value

In [136]:
from sklearn.model_selection import train_test_split
def eval_tree(data:pd.DataFrame):
    train, test = train_test_split(breast_cancer,test_size=0.3)
    train = train.reset_index()
    train.drop('index',axis=1,inplace=True)
    test = test.reset_index()
    test.drop('index',axis=1,inplace=True)
    tree = create_tree(train)
    tree.id3()
    size = len(test)
    score = 0.0
    for _,entry in test.iterrows():
        score += entry['irradiat'] == predict(tree.node,entry)
    print(score/size)

In [137]:
eval_tree(breast_cancer)

0.7441860465116279
