In [2]:
import torch
import pandas
import numpy as np

In [None]:
import pandas
#dataset taken from https://www.kaggle.com/yashsawarn/wifi-stretgth-for-rooms

def read_dataset(csv_name = 'wifi_localization.txt'):
    """
    Reads a csv dataset
    returns it as a pytorch tensor
    """
    data_frame = pandas.read_table(csv_name, delim_whitespace=True, names=('A', 'B', 'C', 'D','E', 'F', 'G', 'ROOM'),
                       dtype={'A': np.int64, 'B': np.float64, 'C': np.float64, 'D': np.float64,'E': np.float64,'F': np.float64,'G': np.float64,'ROOM': np.float64})

    targets_torch = torch.tensor(data_frame['ROOM'].values)
    dataset_torch = torch.tensor(data_frame.values)

    return dataset_torch
dataset_torch = read_dataset()



In [None]:
print(dataset_torch)

tensor([[-64., -56., -61.,  ..., -82., -81.,   1.],
        [-68., -57., -61.,  ..., -85., -85.,   1.],
        [-63., -60., -60.,  ..., -85., -84.,   1.],
        ...,
        [-62., -59., -46.,  ..., -87., -88.,   4.],
        [-62., -58., -52.,  ..., -90., -85.,   4.],
        [-59., -50., -45.,  ..., -88., -87.,   4.]], dtype=torch.float64)


In [3]:
class Node_CART:
    def __init__(self, num_classes = 4, ref_CART = None, current_depth = 0):
        """
        Create the node attributes
        param num_classes: K number of classes to classify
        param ref_cart: reference to the tree containing the node
        param current_depth: current depth of the node in the tree
        """
        self.ref_CART = ref_CART
        self.threshold_value = 0
        self.feature_num = 0
        self.node_right = None
        self.node_left = None
        self.data_torch_partition = None
        self.gini = 0
        self.dominant_class = None
        self.accuracy_dominant_class = None
        self.num_classes = num_classes
        self.current_depth = current_depth

    def to_xml(self, current_str = ""):
        """
        Recursive function to write the node content to an xml formatted string
        param current_str : the xml content so far in the whole tree
        return the string with the node content
        """
        str_node = "<node><thresh>" + str(self.threshold_value) + "</thresh>" + "<feature>" + str(self.feature_num) + "</feature><depth>" + str(self.current_depth)+ "</depth>"
        str_node += "<gini>" + str(self.gini) + "</gini>"
        if(self.node_right != None):
            str_left = self.node_right.to_xml(current_str)
            str_node += str_left
        if(self.node_left != None):
            str_right = self.node_left.to_xml(current_str)
            str_node += str_right

        if(self.is_leaf()):
            str_node += "<dominant_class>" + str(self.dominant_class) + "</dominant_class><acc_dominant_class>"  + str(self.accuracy_dominant_class) + "</acc_dominant_class>"
        str_node += "</node>"
        return str_node

    def is_leaf(self):
        """
        Checks whether the node is a leaf
        """
        return (self.node_left == None and self.node_right == None)

    def create_with_children(self, data_torch, current_depth, list_selected_features = [], min_gini = 0.000001):
        """
        Creates a node by selecting the best feature and threshold, and if needed, creating its children
        param data_torch: dataset with the current partition to deal with in the node
        param current_depth: depth counter for the node
        param list_selected_features: list of selected features so far for the CART building process
        param min_gini: hyperparmeter selected by the user defining the minimum tolerated gini coefficient for a  node
        return the list of selected features so far
        """


        return list_selected_features


    def select_best_feature_and_thresh(self, data_torch, list_features_selected = [], num_classes = 4):
        """
        Selects the best feature and threshold that minimizes the gini coefficient
        param data_torch: dataset partition to analyze
        param list_features_selected list of features selected so far, thus must be ignored
        param num_classes: number of K classes to discriminate from
        return min_thresh, min_feature, min_gini found for the dataset partition when
        selecting the found feature and threshold
        """

        #TODO
        #return selected cut
        return (min_thresh, min_feature, min_gini)


    def calculate_gini(self, data_partition_torch: torch.tensor, num_classes: int = 4):
        """
        Calculates the gini coefficient for a given partition with the given number of classes
        param data_partition_torch: current dataset partition as a tensor
        param num_classes: K number of classes to discriminate from
        returns the calculated gini coefficient
        """
        COLUMN_INDEX = 7
        class_coef = torch.zeros(num_classes)
        data_size = data_partition_torch.shape[0]
        for gini_class in range(1, num_classes+1, 1):
            class_coef[gini_class-1] = ((data_partition_torch[:, COLUMN_INDEX] == gini_class).sum().item() / data_size)**2
        return torch.tensor(1.0 - class_coef.sum().item())

    def calculate_entropy(self, data_partition_torch, num_classes = 4):
        """
        Calculates the entropy for a given partition with the given number of classes
        param data_partition_torch: current dataset partition as a tensor
        param num_classes: K number of classes to discriminate from
        returns the calculated entropy
        """
        labels = data_partition_torch[:, -1]
        #print('labes: ', labels)
        # We dont have a 0 label.
        class_counts = torch.bincount(labels)[1:]
        #print('class counts: ', class_counts)
        p_k = (class_counts/labels.shape[0])
        entropy = -1*torch.sum(p_k*torch.log(p_k))
        #print('entropy: ', entropy)
        return entropy

    def evaluate_node(self, input_torch):
        """
        Evaluates an input observation within the node.
        If is not a leaf node, send it to the corresponding node
        return predicted label
        """
        feature_val_input = input_torch[self.feature_num]
        if(self.is_leaf()):
            return self.dominant_class
        else:
            if(feature_val_input < self.threshold_value):
                return self.node_left.evaluate_node(input_torch)
            else:
                return self.node_right.evaluate_node(input_torch)


class CART:
    def __init__(self, dataset_torch, max_CART_depth, min_observations = 2):
        """
        CART has only one root node
        """
        #min observations per node
        self.min_observations = min_observations
        self.root = Node_CART(num_classes = 4, ref_CART = self, current_depth = 0)
        self.max_CART_depth = max_CART_depth
        self.list_selected_features = []

    def get_root(self):
        """
        Gets tree root
        """
        return self.root

    def get_min_observations(self):
        """
        return min observations per node
        """
        return self.min_observations

    def get_max_depth(self):
        """
        Gets the selected max depth of the tree
        """
        return self.max_CART_depth

    def build_CART(self, data_torch):
        """
        Build CART from root
        """
        self.list_selected_features = self.root.create_with_children(data_torch, current_depth = 0)

    def to_xml(self, xml_file_name):
        """
        write Xml file with tree content
        """
        str_nodes = self.root.to_xml()
        file = open(xml_file_name,"w+")
        file.write(str_nodes)
        file.close()
        return str_nodes


    def evaluate_input(self, input_torch):
        """
        Evaluate a specific input in the tree and get the predicted class
        """
        return self.root.evaluate_node(input_torch)


def train_CART(dataset_torch, name_xml = "", max_CART_depth = 3, min_obs_per_leaf = 2):
    """
    Train CART model
    """
    tree = CART(dataset_torch = dataset_torch, max_CART_depth = max_CART_depth, min_observations =  min_obs_per_leaf)
    tree.build_CART(dataset_torch)
    if(not name_xml == ""):
        tree.to_xml(name_xml)
    return tree

def test_CART(tree, testset_torch):
    """
    Test a previously built CART
    """
    #TODO, use tree.evaluate_input(current_observation) for this
    return accuracy



#tree = train_CART(dataset_torch, name_xml = "CART_example.xml")
#acc = test_CART(tree, dataset_torch)





## Gini Unittest

In [4]:
import unittest

class GiniUnitTest(unittest.TestCase):

    def test_singleClassOneData(self):
      data = torch.tensor([[-64, -56,	-61,	-66,	-71,	-82,	-81,	1]])
      node = Node_CART()
      gini_result = node.calculate_gini(data, num_classes=1)
      self.assertTrue(torch.equal(gini_result, torch.tensor(0.0)))

    def test_twoClassesOneDataPerClass(self):
      data = torch.tensor([[-64, -56,	-61,	-66,	-71,	-82,	-81,	1], [-64, -56,	-61,	-66,	-71,	-82,	-81,	2]])
      node = Node_CART()
      gini_result = node.calculate_gini(data, num_classes=2)
      self.assertTrue(torch.equal(gini_result, torch.tensor(0.5)))

    def test_twoClassesOnlyOneClassWithData(self):
      data = torch.tensor([[-64, -56,	-61,	-66,	-71,	-82,	-81,	1], [-64, -56,	-61,	-66,	-71,	-82,	-81,	1]])
      node = Node_CART()
      gini_result = node.calculate_gini(data, num_classes=2)
      self.assertTrue(torch.equal(gini_result, torch.tensor(0.0)))

    def test_fourClassesOneDataPerClass(self):
      data = torch.tensor([[-64, -56,	-61,	-66,	-71,	-82,	-81,	1], [-64, -56,	-61,	-66,	-71,	-82,	-81,	2], [-64, -56,	-61,	-66,	-71,	-82,	-81,	3], [-64, -56,	-61,	-66,	-71,	-82,	-81,	4]])
      node = Node_CART()
      gini_result = node.calculate_gini(data, num_classes=4)
      self.assertTrue(torch.equal(gini_result, torch.tensor(0.75)))

    def test_fourClassesOnlyTwoClassesWithData(self):
      data = torch.tensor([[-64, -56,	-61,	-66,	-71,	-82,	-81,	1], [-64, -56,	-61,	-66,	-71,	-82,	-81,	1], [-64, -56,	-61,	-66,	-71,	-82,	-81,	4], [-64, -56,	-61,	-66,	-71,	-82,	-81,	4]])
      node = Node_CART()
      gini_result = node.calculate_gini(data, num_classes=4)
      self.assertTrue(torch.equal(gini_result, torch.tensor(0.5)))

    def test_fourClassesOnlyThreeClassesWithData(self):
      data = torch.tensor([[-64, -56,	-61,	-66,	-71,	-82,	-81,	1], [-64, -56,	-61,	-66,	-71,	-82,	-81,	1], [-64, -56,	-61,	-66,	-71,	-82,	-81,	3], [-64, -56,	-61,	-66,	-71,	-82,	-81,	4]])
      node = Node_CART()
      gini_result = node.calculate_gini(data, num_classes=4)
      self.assertTrue(torch.equal(gini_result, torch.tensor(0.625)))

unittest.main(argv=[''], verbosity=2, exit=False)

test_fourClassesOneDataPerClass (__main__.GiniUnitTest) ... ok
test_fourClassesOnlyThreeClassesWithData (__main__.GiniUnitTest) ... ok
test_fourClassesOnlyTwoClassesWithData (__main__.GiniUnitTest) ... ok
test_singleClassOneData (__main__.GiniUnitTest) ... ok
test_twoClassesOneDataPerClass (__main__.GiniUnitTest) ... ok
test_twoClassesOnlyOneClassWithData (__main__.GiniUnitTest) ... ok

----------------------------------------------------------------------
Ran 6 tests in 0.176s

OK


<unittest.main.TestProgram at 0x781f6c376f80>

# Entropy Unit Test

In [10]:
class EntropyUnitTest(unittest.TestCase):

    def test_oneClassOneData(self):
      data = torch.tensor([[-64, -56,	-61,	-66,	-71,	-82,	-81,	1]])
      node = Node_CART()
      entropy_result = node.calculate_entropy(data, num_classes=1)
      self.assertTrue(torch.equal(entropy_result, torch.tensor(0.0)))
    def test_twoClassesOneDataPerClass(self):
      data = torch.tensor([[-64, -56,	-61,	-66,	-71,	-82,	-81,	1],[-64, -56,	-61,	-66,	-71,	-82,	-81,	2]])
      node = Node_CART()
      entropy_result = node.calculate_entropy(data, num_classes=1)
      self.assertTrue(torch.equal(entropy_result, torch.log(torch.tensor(2))))
    def test_fourClassesOneDataPerClass(self):
      data = torch.tensor([[-64, -56,	-61,	-66,	-71,	-82,	-81,	1], [-64, -56,	-61,	-66,	-71,	-82,	-81,	2], [-64, -56,	-61,	-66,	-71,	-82,	-81,	3], [-64, -56,	-61,	-66,	-71,	-82,	-81,	4]])
      node = Node_CART()
      entropy_result = node.calculate_entropy(data, num_classes=4)
      self.assertTrue(torch.equal(entropy_result, torch.log(torch.tensor(4))))

unittest.main(argv=[''], verbosity=2, exit=False)

test_fourClassesOneDataPerClass (__main__.EntropyUnitTest) ... ok
test_oneClassOneData (__main__.EntropyUnitTest) ... ok
test_twoClassesOneDataPerClass (__main__.EntropyUnitTest) ... ok
test_fourClassesOneDataPerClass (__main__.GiniUnitTest) ... ok
test_fourClassesOnlyThreeClassesWithData (__main__.GiniUnitTest) ... ok
test_fourClassesOnlyTwoClassesWithData (__main__.GiniUnitTest) ... ok
test_singleClassOneData (__main__.GiniUnitTest) ... ok
test_twoClassesOneDataPerClass (__main__.GiniUnitTest) ... ok
test_twoClassesOnlyOneClassWithData (__main__.GiniUnitTest) ... ok

----------------------------------------------------------------------
Ran 9 tests in 0.040s

OK


labes:  tensor([1, 2, 3, 4])
class counts:  tensor([1, 1, 1, 1])
entropy:  tensor(1.3863)
labes:  tensor([1])
class counts:  tensor([1])
entropy:  tensor(-0.)
labes:  tensor([1, 2])
class counts:  tensor([1, 1])
entropy:  tensor(0.6931)


<unittest.main.TestProgram at 0x781f6c374820>

In [None]:
dataset_example = torch.tensor([[3, 22.0, 7.2, 0], [1, 38, 71.3, 0], [3, 26, 7.9, 1], [1, 35, 53.1, 0]])
print("dataset_example \n", dataset_example)


dataset_example 
 tensor([[ 3.0000, 22.0000,  7.2000,  0.0000],
        [ 1.0000, 38.0000, 71.3000,  0.0000],
        [ 3.0000, 26.0000,  7.9000,  1.0000],
        [ 1.0000, 35.0000, 53.1000,  0.0000]])


In [None]:
root_node = Node_CART()
root_node.data_torch_partition = dataset_example

print("root node \n ", root_node.data_torch_partition)
xml_root_node = root_node.to_xml()

print("xml_root_node \n", xml_root_node)

root node 
  tensor([[ 3.0000, 22.0000,  7.2000,  0.0000],
        [ 1.0000, 38.0000, 71.3000,  0.0000],
        [ 3.0000, 26.0000,  7.9000,  1.0000],
        [ 1.0000, 35.0000, 53.1000,  0.0000]])
xml_root_node 
 <node><thresh>0</thresh><feature>0</feature><depth>0</depth><gini>0</gini><dominant_class>None</dominant_class><acc_dominant_class>None</acc_dominant_class></node>


In [None]:
root_node.threshold_value = 3
root_node.feature_num = 0

#indices of left and right partitions
left_idxs = dataset_example[:, root_node.feature_num] < root_node.threshold_value
right_idxs = dataset_example[:, root_node.feature_num] >= root_node.threshold_value
#data partitions
dataset_partition_left = dataset_example[left_idxs]
dataset_partition_right = dataset_example[right_idxs]

print("dataset_partition_left \n", dataset_partition_left)
print("dataset_partition_right \n", dataset_partition_right)
#create left child
left_child = Node_CART(current_depth = 1)
left_child.data_torch_partition = dataset_partition_left
root_node.node_left = left_child
#create right child
right_child = Node_CART(current_depth = 1)
right_child.data_torch_partition = dataset_partition_right
root_node.node_right = right_child
#write xml example
root_node.ref_CART = root_node
xml_string = root_node.to_xml()

#print(xml_string)
file = open("example1.xml", "a")
file.write(xml_string)
file.close()


dataset_partition_left 
 tensor([[ 1.0000, 38.0000, 71.3000,  0.0000],
        [ 1.0000, 35.0000, 53.1000,  0.0000]])
dataset_partition_right 
 tensor([[ 3.0000, 22.0000,  7.2000,  0.0000],
        [ 3.0000, 26.0000,  7.9000,  1.0000]])


In [None]:
print(xml_string)

<node><thresh>3</thresh><feature>0</feature><depth>0</depth><gini>0</gini><node><thresh>0</thresh><feature>0</feature><depth>1</depth><gini>0</gini><dominant_class>None</dominant_class><acc_dominant_class>None</acc_dominant_class></node><node><thresh>0</thresh><feature>0</feature><depth>1</depth><gini>0</gini><dominant_class>None</dominant_class><acc_dominant_class>None</acc_dominant_class></node></node>


In [None]:
def calculate_gini(node_left, node_right):
  size_left = node_left.data_torch_partition.shape[0]
  size_right = node_right.data_torch_partition.shape[0]
  size_total = size_left + size_right
  gini_left = 1 - (((node_left.data_torch_partition[:, 3] == 0).sum().item() / size_left)**2 + ((node_left.data_torch_partition[:, 3] == 1).sum().item()/size_left)**2)
  gini_right = 1 - (((node_right.data_torch_partition[:, 3] == 0).sum().item() / size_right)**2 + ((node_right.data_torch_partition[:, 3] == 1).sum().item()/size_right)**2)
  gini_total = (size_left / size_total * gini_left) + (size_right / size_total * gini_right)
  print("gini_left: ", gini_left, "gini_right: ", gini_right, "gini_total: ", gini_total)
  return gini_total
gini_total = calculate_gini(left_child, right_child)
print("gini_total ", gini_total)

gini_left:  0.0 gini_right:  0.5 gini_total:  0.25
gini_total  0.25
