In [2]:
import torch
import pandas
import numpy as np

In [3]:
import pandas
#dataset taken from https://www.kaggle.com/yashsawarn/wifi-stretgth-for-rooms

def read_dataset(csv_name = 'wifi_localization.txt'):
    """
    Reads a csv dataset 
    returns it as a pytorch tensor
    """
    data_frame = pandas.read_table(csv_name, delim_whitespace=True, names=('A', 'B', 'C', 'D','E', 'F', 'G', 'ROOM'),
                       dtype={'A': np.int64, 'B': np.float64, 'C': np.float64, 'D': np.float64,'E': np.float64,'F': np.float64,'G': np.float64,'ROOM': np.float64})

    targets_torch = torch.tensor(data_frame['ROOM'].values)
    dataset_torch = torch.tensor(data_frame.values)
    print("dataset torch ", dataset_torch.shape)
    return dataset_torch
dataset_torch = read_dataset()


dataset torch  torch.Size([2000, 8])


In [91]:
class Node_CART:    
    def __init__(self, num_classes = 4, ref_CART = None, current_depth = 0):
        """
        Create the node attributes
        param num_classes: K number of classes to classify
        param ref_cart: reference to the tree containing the node
        param current_depth: current depth of the node in the tree
        """
        self.ref_CART = ref_CART
        self.threshold_value = 0
        self.feature_num = 0
        self.node_right = None
        self.node_left = None
        self.data_torch_partition = None
        self.gini = 0
        self.dominant_class = None
        self.accuracy_dominant_class = None        
        self.num_classes = num_classes
        self.current_depth = current_depth
    
    def to_xml(self, current_str = ""):
        """
        Recursive function to write the node content to an xml formatted string
        param current_str : the xml content so far in the whole tree
        return the string with the node content
        """
        str_node = "<node><thresh>" + str(self.threshold_value) + "</thresh>" + "<feature>" + str(self.feature_num) + "</feature><depth>" + str(self.current_depth)+ "</depth>" 
        str_node += "<gini>" + str(self.gini) + "</gini>"
        if(self.node_right != None):
            str_left = self.node_right.to_xml(current_str)
            str_node += str_left
        if(self.node_left != None):
            str_right = self.node_left.to_xml(current_str)
            str_node += str_right
            
        if(self.is_leaf()):
            str_node += "<dominant_class>" + str(self.dominant_class) + "</dominant_class><acc_dominant_class>"  + str(self.accuracy_dominant_class) + "</acc_dominant_class>"
        str_node += "</node>"
        return str_node
    
    def is_leaf(self):
        """
        Checks whether the node is a leaf
        """
        return (self.node_left == None and self.node_right == None)
    
    def create_with_children(self, data_torch, current_depth, list_selected_features = [], min_gini = 0.000001):
        """
        Creates a node by selecting the best feature and threshold, and if needed, creating its children
        param data_torch: dataset with the current partition to deal with in the node
        param current_depth: depth counter for the node
        param list_selected_features: list of selected features so far for the CART building process
        param min_gini: hyperparmeter selected by the user defining the minimum tolerated gini coefficient for a  node
        return the list of selected features so far
        """        
        #update depth of children
        depth_children = current_depth + 1
      
        if(depth_children <= self.ref_CART.get_max_depth()):
            num_observations = data_torch.shape[0]            
            #careful with max depth
            #if no threshold and feature were selected, select it using a greedy approach            
            (threshold_value, feature_num, gini) = self.select_best_feature_and_thresh(data_torch, list_features_selected = [])
            list_selected_features += [feature_num]
            #store important data in attributes
            self.threshold_value = threshold_value
            self.feature_num = feature_num
            self.data_torch_partition = data_torch
            self.gini = gini            
            num_features = data_torch.shape[1]

            #create the right and left node data if the current gini is still high            
            if(self.gini > min_gini): 
                data_torch_left = data_torch[data_torch[:, feature_num] < threshold_value]
                data_torch_right = data_torch[data_torch[:, feature_num] >= threshold_value]

                #if the new partitions have more than min_observations, make them
                if(data_torch_left.shape[0] >= self.ref_CART.get_min_observations() and data_torch_right.shape[0] >= self.ref_CART.get_min_observations()):
                    #add data to the right and left children

                    self.node_right = Node_CART(num_classes = self.num_classes, ref_CART = self.ref_CART, current_depth = depth_children)
                    self.node_left = Node_CART(num_classes = self.num_classes, ref_CART = self.ref_CART, current_depth = depth_children)
                    list_selected_features = self.node_right.create_with_children(data_torch_right, depth_children, list_selected_features = list_selected_features)            
                    self.node_left.create_with_children( data_torch_left, depth_children, list_selected_features = list_selected_features)
        #if is leaf, fill the         
        if(self.is_leaf()):            
            labels_data = data_torch[:,  -1]
            self.dominant_class = torch.mode(labels_data).values.item()
            num_obs_label = labels_data[labels_data == self.dominant_class].shape[0]
            self.accuracy_dominant_class = num_obs_label / labels_data.shape[0]           
            
        return list_selected_features
    
    
    def select_best_feature_and_thresh(self, data_torch, list_features_selected = [], num_classes = 4):
        """
        Selects the best feature and threshold that minimizes the gini coefficient
        param data_torch: dataset partition to analyze
        param list_features_selected list of features selected so far, thus must be ignored 
        param num_classes: number of K classes to discriminate from 
        return min_thresh, min_feature, min_gini found for the dataset partition when 
        selecting the found feature and threshold
        """       
        data_partition_torch = data_torch
        min_gini = 10
        threshold = 0
        min_feature = 0
        
        print('data_partition_torch', data_partition_torch)
        
        for feature_id in range(3):
            print("------------- Feature id: {} -------------".format(feature_id))
            # Get all the values of the column
            column_values = data_partition_torch[:,feature_id]
            
            # Sort tensor in ascending order
            values_sorted, _ = torch.sort(column_values)
            print('values_sorted', values_sorted)
            
            # Get the first value, and ignore it in the for.
            threshold_tmp = values_sorted[0].item()
            for value in values_sorted[1:]:
                print("------------- Feature id: {}, Value: {} -------------".format(feature_id, value.item()))
                if threshold_tmp == value.item():
                    print("Continue value:", value.item())
                    continue
                threshold_tmp = value.item()
                
                data_torch_left = data_partition_torch[data_partition_torch[:, feature_id] < threshold_tmp]
                data_torch_right = data_partition_torch[data_partition_torch[:, feature_id] >= threshold_tmp]
                
                print('Data < {}: {}'.format(value.item(), data_torch_left))
                print('Data > {}: {}'.format(value.item(), data_torch_right))
                
                left_entropy = self.calculate_entropy(data_torch_left[:,-1],num_classes=2)
                right_entropy = self.calculate_entropy(data_torch_right[:,-1],num_classes=2)
                
                print('left_entropy: ', left_entropy)
                print('right_entropy: ', right_entropy)
                
                # Calculate ponderate
                ponderate = self.get_ponderate(data_torch_left.shape[0], data_torch_right.shape[0], data_partition_torch.shape[0], left_entropy, right_entropy)
                print('Ponderate: ',ponderate)
                
                if ponderate < min_gini:
                    min_gini = ponderate
                    threshold = threshold_tmp
                    min_feature = feature_id


        #self.calculate_entropy(data_partition_torch)
        #self.calculate_gini(data_partition_torch)
        
        print('min_gini: ',min_gini)
        print('threshold: ',threshold)
        print('min_feature: ',min_feature)
        print('-----------------------------------------------------------------')
        return (threshold, min_feature, min_gini) 
    
    
    def calculate_gini(self, data_partition_torch, num_classes = 4):
        """
        Calculates the gini coefficient for a given partition with the given number of classes
        param data_partition_torch: current dataset partition as a tensor
        param num_classes: K number of classes to discriminate from
        returns the calculated gini coefficient
        """
        
        
        # Calculate the density of each class
        density = self.get_density(data_partition_torch, num_classes)
        
        # Calculate the entropy using the density array
        gini =  1 -  np.dot( density ,  density )
        print("gini:", gini)
        
        return gini
         
    
    def calculate_entropy(self, data_partition_torch, num_classes = 4):
        """
        Calculates the gini coefficient for a given partition with the given number of classes
        param data_partition_torch: current dataset partition as a tensor
        param num_classes: K number of classes to discriminate from
        returns the calculated entropy coefficient
        
        data_partition_torch = [1, 2, 2, 2, 3, 3, 3, 3, 3, 4]
        
        """
        
        # Calculate the density of each class
        density = self.get_density(data_partition_torch, num_classes)
        
        # Calculate the entropy using the density array
        entropy =  -1* np.dot( density ,  np.log2(density) )
        
        return entropy
    
    def get_density(self, data_partition_torch, num_classes):
        """
        Calculates the density of each class
        param data_partition_torch: current dataset partition as a tensor
        param num_classes: K number of classes
        returns: the calculated density array
        
        data_partition_torch = [1, 2, 2, 2, 3, 3, 3, 3, 3, 4]
        
        """
        data_len = data_partition_torch.shape[0]
        
        # Get the amount of each class from the data_partition_torch
        # Example of output: [0, 2, 3, 5, 6], ignore the first element
        
        values = torch.bincount(data_partition_torch.int(),minlength=num_classes)
        #values = torch.bincount(data_partition_torch.int(),minlength=num_classes+1)
        
        # Calculate the density of each class
        density = []
        
        #for value in values[1:]:
        for value in values[0:]:
            data = value/data_len + 0.000001
            if data == 0:
                data += 0.000001
            density.append(data)
        
        return density
    
    def get_ponderate(self, nl, nr, n, hl, hr):
        """
        Return the ponderate data.
        param nl: Number left observations
        param nr: Number riht observations
        param n: Total of observations
        param hl: Left value (entropy or gini)
        param hd: Right value (entropy or gini)
        
        """
        return (nr/n)*hr + (nl/n)*hl  
    
    def evaluate_node(self, input_torch): 
        """
        Evaluates an input observation within the node. 
        If is not a leaf node, send it to the corresponding node
        return predicted label
        """
        feature_val_input = input_torch[self.feature_num]
        if(self.is_leaf()):
            return self.dominant_class
        else:
            if(feature_val_input < self.threshold_value):
                return self.node_left.evaluate_node(input_torch)
            else:
                return self.node_right.evaluate_node(input_torch)
        

class CART:
    def __init__(self, dataset_torch, max_CART_depth, min_observations = 2):
        """
        CART has only one root node
        """
        #min observations per node
        self.min_observations = min_observations
        self.root = Node_CART(num_classes = 4, ref_CART = self, current_depth = 0)
        self.max_CART_depth = max_CART_depth
        self.list_selected_features = []
        
    def get_root(self):
        """
        Gets tree root
        """
        return self.root
    
    def get_min_observations(self):
        """
        return min observations per node
        """
        return self.min_observations
    
    def get_max_depth(self):
        """
        Gets the selected max depth of the tree
        """
        return self.max_CART_depth
    
    def build_CART(self, data_torch):
        """
        Build CART from root
        """
        self.list_selected_features = self.root.create_with_children(data_torch, current_depth = 0)
    
    def to_xml(self, xml_file_name):
        """
        write Xml file with tree content
        """
        str_nodes = self.root.to_xml()
        file = open(xml_file_name,"w+") 
        file.write(str_nodes)
        file.close()
        return str_nodes
    
    
    def evaluate_input(self, input_torch):
        """
        Evaluate a specific input in the tree and get the predicted class
        """
        return self.root.evaluate_node(input_torch)
        
    
def train_CART(dataset_torch, name_xml = "", max_CART_depth = 3, min_obs_per_leaf = 2): 
    """
    Train CART model
    """
    tree = CART(dataset_torch = dataset_torch, max_CART_depth = max_CART_depth, min_observations =  min_obs_per_leaf)
    tree.build_CART(dataset_torch)
    if(not name_xml == ""):
        tree.to_xml(name_xml)
    return tree

def test_CART(tree, testset_torch):
    """
    Test a previously built CART
    """
    n = testset_torch.shape[0]
    c = 0
    for value in testset_torch:
        result = tree.evaluate_input(value)
        if result == value[-1]:
            c += 1
    return c/n


data_test = torch.tensor([[3, 22, 7.2, 0], [1, 38, 71.3, 0], [3, 26, 7.9, 1],[1, 35, 53.1, 0]])        
testset = torch.tensor([[3, 21, 7.4, 0],[1, 40, 70.4, 0], [3, 27, 9, 1]]) 
tree = train_CART(data_test, name_xml = "CART_example.xml")
acc = test_CART(tree, testset)

print("Accuracy:", acc)
        

    
        

data_partition_torch tensor([[ 3.0000, 22.0000,  7.2000,  0.0000],
        [ 1.0000, 38.0000, 71.3000,  0.0000],
        [ 3.0000, 26.0000,  7.9000,  1.0000],
        [ 1.0000, 35.0000, 53.1000,  0.0000]])
------------- Feature id: 0 -------------
values_sorted tensor([1., 1., 3., 3.])
------------- Feature id: 0, Value: 1.0 -------------
Continue value: 1.0
------------- Feature id: 0, Value: 3.0 -------------
Data < 3.0: tensor([[ 1.0000, 38.0000, 71.3000,  0.0000],
        [ 1.0000, 35.0000, 53.1000,  0.0000]])
Data > 3.0: tensor([[ 3.0000, 22.0000,  7.2000,  0.0000],
        [ 3.0000, 26.0000,  7.9000,  1.0000]])
left_entropy:  1.855570553743746e-05
right_entropy:  0.9999991059303284
Ponderate:  0.5000088308179329
------------- Feature id: 0, Value: 3.0 -------------
Continue value: 3.0
------------- Feature id: 1 -------------
values_sorted tensor([22., 26., 35., 38.])
------------- Feature id: 1, Value: 26.0 -------------
Data < 26.0: tensor([[ 3.0000, 22.0000,  7.2000,  0.0000]]