# Assignment 2 - Part B: Building a decision tree

This is a skeleton of a decision tree classifier for the example data set in `data/example.csv`.

In [44]:
import csv
import math
from statistics import median, mode
from collections import Counter
from enum import Enum

Some simple type definitions.

In [45]:
class AttrType(Enum):
    cat = 0  # categorical (qualitative) attribute
    num = 1  # numerical (quantitative) attribute
    target = 2  # target label


class NodeType(Enum):
    root = 0
    internal = 1
    leaf = 2


class SplitType(Enum):
    bin = 0  # binary split
    multi = 1  # multi-way split

Also, some basic classes to represent an attribute, a spltting procedure, and a node.

In [46]:
class Attribute(object):
    def __init__(self, label, type):
        assert type in AttrType
        self.label = label
        self.type = type
        self.stat = None  # holds mean for numerical and mode for categorical attributes


class Splitting(object):
    def __init__(self, attr, infogain, split_type, cond, splits):
        self.attr = attr  # attribute ID (index in ATTR)
        self.infogain = infogain  # information gain if splitting is done on this attribute
        self.split_type = split_type  # one of SplitType
        self.cond = cond  # splitting condition, i.e., values on outgoing edges
        self.splits = splits  # list of training records (IDs) for each slitting condition


class Node(object):
    def __init__(self, id, type, parent_id, children=None, edge_value=None, val=None, split_type=None, split_cond=None,
                 infogain=None):
        self.id = id  # ID (same as the index in DT.model list)
        self.type = type  # one of NodeType
        self.parent_id = parent_id  # ID of parent node (None if root)
        self.children = children  # list of IDs of child nodes
        self.edge_value = edge_value  # the value of the incoming edge (only if not root node)
        self.val = val  # if root or internal node: the attribute that is compared at that node; if leaf node: the target value
        self.split_type = split_type  # one of SplitType
        self.split_cond = split_cond  # splitting condition (median value for binary splits on numerical values; otherwise a list of categorical values (corresponding to child nodes))
        self.infogain = infogain

    def append_child(self, node_id):
        self.children.append(node_id)

The input filename is hard-coded.

In [47]:
INFILE = "data/basketball.train.csv"
TESTFILE = "data/basketball.test.csv"
OUTFILE = "data/basketball.prediction.csv"

The attribute labels types are hard-coded too (the same order as in the file!).

In [48]:
ATTR = [Attribute("LOCATION", AttrType.cat), Attribute("W", AttrType.cat),
        Attribute("FINAL_MARGIN", AttrType.num), Attribute("SHOT_NUMBER", AttrType.num), Attribute("PERIOD", AttrType.cat), 
        Attribute("GAME_CLOCK", AttrType.num), Attribute("SHOT_CLOCK", AttrType.num), Attribute("DRIBBLES", AttrType.num), 
        Attribute("TOUCH_TIME", AttrType.num), Attribute("SHOT_DIST", AttrType.num), Attribute("PTS_TYPE", AttrType.cat), 
        Attribute("CLOSE_DEF_DIST", AttrType.num), Attribute("Target", AttrType.target)]

The index of the target attribute (assuming it's the last).

In [49]:
IDX_TARGET = len(ATTR) - 1 

A main class DT representing the decision tree classifier. It could represent with methods:

  - a given inpurity measure;
  - the search for the best attribute to split with;
  - the addition of a node to the tree;
  - a convenient model printer;
  - the recursive call for obtaining a tree;
  - a builder and an applier.

In [50]:
class DT(object):
    def __init__(self):
        self.data = None  # training data set (loaded into memory)
        self.model = None  # decision tree model
        self.default_class = None  # default target class

    def __load_data(self):
        with open(INFILE) as csvfile:
            self.data = []
            csvreader = csv.reader(csvfile, delimiter=',')
            for row in csvreader:
                rec = []
                for i in range(len(ATTR)):
                    val = row[i].strip()
                    # convert numerical attributes
                    if ATTR[i].type == AttrType.num:  # Note that this will break for "?" (missing attribute)
                        val = float(val)
                    rec.append(val)
                self.data.append(rec)
                # self.data.append([element.strip() for element in row])  # strip spaces
                
    def __median(self, a):
        return median(value[a] for value in self.data)
    
    def __gain(self, records, split, entropy):
        made = sum([1 for record in split if self.data[record][IDX_TARGET] == "made"])
        missed = sum([1 for record in split if self.data[record][IDX_TARGET] == "missed"])
        return ((made + missed) / len(records)) * entropy
        
    
    def __entropy(self, records):
        """
        Calculates entropy for a selection of records.

        :param records: Data records (given by indices)
        """
        made = sum([1 for record in records if self.data[record][IDX_TARGET] == "made"])
        missed = sum([1 for record in records if self.data[record][IDX_TARGET] == "missed"])
        if(made == 0 and missed == 0):
            return 1
        elif(made == 0 or missed == 0):
            return 0
        entropy = -(missed / (made + missed) * math.log2(missed / (made + missed))) - (made / (made + missed) * math.log2(made / (made + missed)))
        return entropy

    def __find_best_attr(self, attrs, records):
        """
        Finds the attribute with the largest gain.

        :param attrs: Set of attributes
        :param records: Training set (list of record ids)
        :return:
        """
        entropy_p = self.__entropy(records)  # parent's entropy
        splittings = []  # holds the splitting information for each attribute
        
        for a in attrs:
            assert ATTR[a].type in AttrType
            splits = {}  # record IDs corresponding to each split
            # splitting condition depends on the attribute type
            if ATTR[a].type == AttrType.target:  # skip target attribute
                continue
            elif ATTR[a].type == AttrType.cat:  # categorical attribute
                # multi-way split on each possible value
                split_mode = SplitType.multi
                # each possible attr value corresponds to a split (indexed with categorical labels)
                # Note: it's important to consider attr values from the entire training set
                split_cond = set([self.data[idx][a] for idx in range(len(self.data))])
                
                # `splits[val]` holds a list of records for a given split,
                # where `val` is an element of `split_cond`
                for val in split_cond:
                    splits[val] = [record for record in records if self.data[record][a] == val]
                
            elif ATTR[a].type == AttrType.num:  # numerical attribute => binary split on median value
                split_mode = SplitType.bin
                split_cond = self.__median(a)  # (i.e., if less or equal than this value)
                # TODO collect training records for each split (in `splits`)
                # Splitting condition is the median of the numeric attribute
                # in the set of data we get (records). This turns it into a binary split
                # Eg. greater than median = add to one bucket, less than or equal = add to the other
                greater = [record for record in records if self.data[record][a] > split_cond]
                lowequal = [record for record in records if self.data[record][a] <= split_cond]
                
                splits['top'] = greater
                splits['bottom'] = lowequal
            infogain = entropy_p
            
            for split in splits:
                entropy = self.__entropy(splits[split])
                infogain -= self.__gain(records, splits[split], entropy)
                
            splitting = Splitting(a, infogain, split_mode, split_cond, splits)
            splittings.append(splitting)

        # find best splitting
        best_splitting = sorted(splittings, key=lambda x: x.infogain, reverse=True)[0]
        return best_splitting

    def __add_node(self, parent_id, node_type=NodeType.internal, edge_value=None, val=None, split_type=None,
                   split_cond=None):
        """
        Adds a node to the decision tree.

        :param parent_id:
        :param node_type:
        :param edge_value:
        :param val:
        :param split_type:
        :param split_cond:
        :return:
        """
        node_id = len(self.model)  # id of the newly assigned node
        if not self.model:  # the tree is empty
            node_type = NodeType.root

        node = Node(node_id, node_type, parent_id, children=[], edge_value=edge_value, val=val, split_type=split_type,
                    split_cond=split_cond)
        self.model.append(node)

        # also add it as a child of the parent node
        if parent_id is not None:
            self.model[parent_id].append_child(node_id)

        return node_id

    def __id3(self, attrs, records, parent_id=None, value=None):
        """
        Function ID3 that returns a decision tree.

        :param attrs: Set of attributes
        :param records: Training set (list of record ids)
        :param parent_id: ID of parent node
        :param value: Value corresponding to the parent attribute, i.e., label of the edge on which we arrived to this node
        :return:
        """
        # empty training set or empty set of attributes => create leaf node with default class
        if not records or not attrs:
            self.__add_node(parent_id, node_type=NodeType.leaf, edge_value=value, val=self.default_class)
            return

        # if all records have the same target value => create leaf node with that target value
        same = all(self.data[idx][IDX_TARGET] == self.data[records[0]][IDX_TARGET] for idx in records)
        if same:
            target = self.data[records[0]][IDX_TARGET]
            self.__add_node(parent_id, node_type=NodeType.leaf, edge_value=value, val=target)
            return
        
        if len(records) < 1000:
            classes = [self.data[record][IDX_TARGET] for record in records]
            """ mode() returns the most common value in a set of data. If the set is empty or there is not exactly one most
                common value, it returns a StatisticsError."""
            try:
                default_class = mode(classes)
            except:
                default_class = self.default_class
            if(default_class  == "made"):
                self.__add_node(parent_id, node_type=NodeType.leaf, edge_value=value, val="made")
            elif(default_class == "missed"):
                self.__add_node(parent_id, node_type=NodeType.leaf, edge_value=value, val="missed")
            return
            
        
        # find the attribute with the largest gain
        splitting = self.__find_best_attr(attrs, records)
        # add node
        node_id = self.__add_node(parent_id, edge_value=value, val=splitting.attr, split_type=splitting.split_type,
                                  split_cond=splitting.cond)
        # TODO call tree construction recursively for each split
        newA = list(attrs)
        newA.remove(splitting.attr)
        # First we check what type of split we're dealing with
        if(splitting.split_type == SplitType.multi):
            # On SplitType.multi we can have multiple splitting conditions, ex. Win/Loss, PTS_TYPE, 1, 2, 3
            for condition in splitting.cond:
                # Recursively call a decision tree for each condition with a list fullfilling the condition
                self.__id3(newA, [record for record in records if self.data[record][splitting.attr] == condition], parent_id=node_id, value=condition)
                
        if(splitting.split_type == SplitType.bin):
            greater = [record for record in records if self.data[record][splitting.attr] > splitting.cond]
            lowequal = [record for record in records if self.data[record][splitting.attr] <= splitting.cond]
            self.__id3(newA, greater, parent_id=node_id, value=False)
            self.__id3(newA, lowequal, parent_id=node_id, value=True)

    def print_model(self, node_id=0, level=0):
        node = self.model[node_id]
        indent = "  " * level
        if node.type == NodeType.leaf:
            print(indent + str(node.edge_value) + " [Leaf node] class=" + node.val)
        else:
            cond = " <= " + str(node.split_cond) if ATTR[node.val].type == AttrType.num else " == ? "
            if node.type == NodeType.root:
                print("[Root node] '" + ATTR[node.val].label + "'" + cond)
            else:
                print(indent + str(node.edge_value) + " [Internal node] '" + ATTR[node.val].label + "'" + cond)
            # print tree for child notes recursively
            for n_id in node.children:
                self.print_model(n_id, level + 1)

    def build_model(self):
        self.__load_data()
        self.model = []  # holds the decision tree model, represented as a list of nodes
        # Get majority class
        #   Note: Counter returns a dictionary, most_common(x) returns a list with the x most common elements as
        #         (key, count) tuples; we need to take the first element of the list and the first element of the tuple
        self.default_class = Counter([x[IDX_TARGET] for x in self.data]).most_common(1)[0][0]
        self.__id3(set(range(len(ATTR) - 1)), list(range(len(self.data))))

    def apply_model(self, record):
        node = self.model[0]
        while node.type != NodeType.leaf:
            if ATTR[node.val].type == AttrType.num:
                edge = False if record[node.val] > node.split_cond else True
            else:
                edge = record[node.val]
                
            for child in node.children:
                if self.model[child].edge_value == edge:
                    node = self.model[child]
            
        return node.val

Finally, the main function building a decision tree model, printing it and applying it on some unseen records.

In [51]:
def main():
    dt = DT()
    print("Build model:")
    dt.build_model()
    dt.print_model()

    test_data = []
    with open(TESTFILE) as csvfile:
        csvreader = csv.reader(csvfile, delimiter=',')
        for row in csvreader:
            rec = []
            for i in range(len(ATTR) - 1):
                val = row[i].strip()
                # convert numerical attributes
                if ATTR[i].type == AttrType.num:  # Note that this will break for "?" (missing attribute)
                    val = float(val)
                rec.append(val)
            test_data.append(rec)
    
    
    with open(OUTFILE, 'w') as output:
        output.write("Id,Target")
        for id, data in enumerate(test_data):
            res = dt.apply_model(data)
            output.write("\n{},{}".format(id + 1, res))
    print("Prediction written to {}".format(OUTFILE))

    


if __name__ == "__main__":
    main()

Build model:
[Root node] 'SHOT_DIST' <= 12.8
  False [Internal node] 'PTS_TYPE' == ? 
    3 [Internal node] 'W' == ? 
      L [Internal node] 'CLOSE_DEF_DIST' <= 3.7
        False [Internal node] 'SHOT_CLOCK' <= 12.4
          False [Internal node] 'PERIOD' == ? 
            7 [Leaf node] class=missed
            4 [Internal node] 'SHOT_NUMBER' <= 5.0
              False [Leaf node] class=missed
              True [Leaf node] class=missed
            5 [Leaf node] class=missed
            6 [Leaf node] class=missed
            1 [Leaf node] class=missed
            2 [Leaf node] class=missed
            3 [Leaf node] class=missed
          True [Internal node] 'TOUCH_TIME' <= 1.5
            False [Leaf node] class=missed
            True [Internal node] 'GAME_CLOCK' <= 363.0
              False [Internal node] 'PERIOD' == ? 
                7 [Leaf node] class=made
                4 [Leaf node] class=missed
                5 [Leaf node] class=made
                6 [Leaf node] class=m