In [1]:
%pylab inline
from collections import Counter

Populating the interactive namespace from numpy and matplotlib


# COMP 598 - Lecture 8 Decision Trees

- Sometimes linear decision boundaries are not expressive enough we want to create a more complex rule set to model our data.
- Dataset can have numerical information as well as non-numerical (text, categorical, binary)
- Algorithm provides some rules that are human comprehendable.

*Example of a deicison rule:* If texture > a and perimeter > b then "Reccurence"

## Interpretation: Nodes
Nodes represent a paritioning of the input feature space:
1. Internal nodes are tests on the values of different features (or a decision rule)
2. Leaf nodes contain examples that satisfy the rules of the nodes along the branch that is taken to reach it. Each leaf typically contains more than one example, each example can fall in only one leaf.

## Procedure of Classification
At every node, we conduct the test on the new example. We follow the appropriate branch of the tree. Once at a leaf, we will predict the class (if pure), the majority or sample from probabilities of the two classes.

Our learning task here is to choose the tests at every node.

**Naïve Approach:** 
We could enumerate all possible trees (assuming a finite number of tests) and evaluate the error. Select the one that produces the minimum error.

This is problematic for obvious reasons: overfitting and computationally intensive.

**Idea 1:** 

Given a set of labeled training instances:
1. If all training instances have the same class, create a leaf with that class and exit.
2. Pick the best test to split the data on according to some measure $G$.
3. Split the training set according to the value of the outcome of the test.
4. Recursively repeat steps 2 and 3 on each subset of the training data. 

Question 1: What is a "good" test?
Question 2: How do we generate tests?
Question 3: When do we stop building the tree such that we do not over fit the data?

# Information Theory

**Definition (Information Content)**

Let $E$ be an event which occurs with probability $P(E)$. If we are told that $E$ has occured with certainty, then we have recieved $I(E)$ bits of intormation:

$$I(E) = \log_2 \frac{1}{P(E)}$$

Example: If you are observing the outcome of a dice roll, the roll provides $\log_2 6$ bits of information.

**Definition (Entropy)**

Suppose we have an information source $S$ which emits symbols from an alphabet $\{s_1, \ldots, s_k\}$ with probabilities $\{ p_1, \ldots, p_k \}$. The entropy of $S$ is:

$$H(S) = \sum_{i=1}^k p_i I(s_i) = \sum_{i=1}^k p_i \log_2\Big(\frac{1}{p_i}\Big) = - \sum_{i=1}^k p_i\log_2\big(p_i\big)$$

We can rewrite this in terms of the probability distribution $P:~H(P) = \sum_{i} p_i \log_2 \frac{1}{p_i}$

Interpretation of $H$:
1. Average amount of information per outcome.
2. Average amount of "surprise" when observing an outcome.
3. Uncertainty the observer has before seeing an outcome.
4. Average number of bits needed to communicate an outcome. 

Given a binary dataset, $D$, of labels with $p$ positive samples and $n$ negative samples.
$$H(D) = -\frac{p}{p+n}\log_2\frac{p}{p+n} - \frac{n}{p+n}\log_2\frac{n}{p+n}$$


Given a dataset $D$ that can be split into subsets via some tests, we want to know the which test, reduces the entropy the most.

In [2]:
def information_content(p):
    """ Calculates the information content represented by the probability p """
    return np.log2(1/float(p))

def entropy(P):
    """ Calculates the Entropy of the (discrete) probability distribution P """
    return np.sum([ p * information_content(p) for p in P])

def dataset_entropy(D):
    """ Calculates the Entropy of a dataset
        @params:
        D: the dataset containing only the labels
    """
    D = np.array(D)
#     print D
    labels = np.unique(D)
#     print labels
#     print(labels)
    probabilities = np.zeros( len(labels) )
    
    # enumerate over all the labels and conduct some counting
    for label_index, label in enumerate(labels):
        probabilities[label_index] = np.sum(label == D)
    
    #divide by length of the dataset to get the probability of that class 
    probabilities = probabilities/float(len(D))
#     print probabilities
    # return the entropy of the probability distribution
    return entropy(probabilities)

In [3]:
print('Infofmration content of a coin flip is:', information_content(0.5))

('Infofmration content of a coin flip is:', 1.0)


In [4]:
D = ['A', 'A', 'A', 'A', 'B', 'B', 'B', 'B', 'A']
print('Entropy of a such a two class dataset:', dataset_entropy(D))

('Entropy of a such a two class dataset:', 0.991076059838222)


In [5]:
D = ['A', 'A', 'A', 'A', 'B', 'B', 'B', 'B']
print('Entropy of a such a equal two class dataset:', dataset_entropy(D))

('Entropy of a such a equal two class dataset:', 1.0)


# Evaluating Tests
**Definition (Conditional Entropy)**

$$H(y~|~x) = \sum_{v} P(x=v) H(y~|~x=v)$$

**Interpretation:** the expected number of bits needed to transmit y if both the emitter and reciever know the possible values of $x$ (but before they are told $x$'s specific value).


Thus given a test $T$, a datapoint $d_i\in D$ passes the test $T(d_i) = 1$ and fails it $T(d_i) = 0$, it divides the dataset into two sets: $D_{T=0}$ and $D_{T=1}. Entropy is given by:

$$H(D~|~T) = P\Big(T(d)=1)\Big)H\Big(D_{T=1}\Big) + P\Big(T(d)=0\Big) H\Big(D_{T=0}\Big)$$

Here $P(T(d))$ can be calculated by counting how many points $d$ pass the test or do not.

**Definition (Information Gain)**
In the context of a test, $T$. We can define information gain on a dataset $D$ as:
$$IG(T) = H(D) - H(D~|~T)$$


In classification we pick the test with the highest information gain.
In regression we pick the test with the lowest mean-quared error.



In [6]:
# our metric for determining which tests are good.
def information_gain(D, T):
    """
        Calculates the information gain given by this test
    """    
    D = np.array(D)
    observations = D.T[-1].T
    return dataset_entropy(observations) - test_entropy(D, T)

def test_entropy(D, T):
    """
        Calculates the entropy of a dataset D due to test T
        @params:
            D: dataset containing observations [(x_11,..., x_1m, y_1), ..., (x_n1, ..., x_nm, y_n)]
            T: test to apply on the features
    """
    # first vectorize the test.
    D = np.array(D)
    
    #seperate observations and labels into their own arrays so that the test cannot access the label itself.
    observations, labels = D.T[0:-1].T, D.T[-1].T

    test_passed = np.apply_along_axis(T ,axis=1,arr=observations)

    # whats the probability that a datapoint passes a test?
    P_test_passed = np.sum(test_passed)/float(len(D))
    P_test_failed = 1 - P_test_passed
    
    
    # what are the labels that passed the test?
    labels_test_passed = labels[ test_passed ]
    labels_test_failed = labels[ np.invert(test_passed) ]
    
    # what is the entropy of the resulting datasets?
    entropy_test_passed = dataset_entropy(labels_test_passed)
    entropy_test_failed = dataset_entropy(labels_test_failed)

    # return the entropy of the dataset given the test
    return P_test_passed * entropy_test_passed + P_test_failed * entropy_test_failed

In [7]:
D = [ (5,1,1,0,'A'), (5,1,0,1,'A'), (5,0,0,1,'A'), (5,0,0,0,'A'), (3,0,0,0,'B'), (3,1,0,0,'B'), (3,1,1,0,'B'), (3,1,1,1,'B') ]
test = lambda d: int(d[0])==5
print('Information Gain due to test1, where resulting subsets have no impurities:',information_gain(D, test))

test2 = lambda d: int(d[1])==1
print('Information gain due to test2, where resulting subsets are diluted:',information_gain(D, test2))
print('Since IG(test1)>IG(test2), test1 is a better test')

('Information Gain due to test1, where resulting subsets have no impurities:', 1.0)
('Information gain due to test2, where resulting subsets are diluted:', 0.048794940695398581)
Since IG(test1)>IG(test2), test1 is a better test


# Candidate Tests

We now need a way to generate tests and pick the best feature. We use the ID3 algorithm which goes through each unique value of each feature and generates a data split. Once the split has occured, we calculate the information gain by producing splits along a feature. We then pick the feature which causes the greatest information gain.

Our first function, generates all the possible splits along a feature eaxis:

In [8]:
def generate_split(dataset, feature_index, continuous_decision=np.mean):
    """
        generates a split according to the feature_index
        @params:
            dataset: the dataset 
            feature_index: the index of the feature we want to use
            continous_decision: the decision boundary for a continous variable we want to pick
    """
    dataset = np.array(dataset)
    unique_values = np.unique(dataset[:, feature_index])
    
    #we now determine if these features are continous or not
    try:
        unique_values.astype(int)
        type_is = 'int'
    except ValueError as ve:
        # these cannot be converted to ints 
        try:
            unique_values = unique_values.astype(float)
            type_is = 'float'
        except ValueError as ve:
            type_is = 'str'
        #endtry
    #endtry
    if type_is in ['str', 'int']:
        for val in unique_values:
            yield dataset[dataset[:, feature_index] == val]
    else:
        # beta
        print('Continous values decisions are in beta')
        avg = continuous_decision(unique_values)
        yield dataset[dataset[:,feature_index] >= avg ] 
        yield dataset[dataset[:,feature_index] < avg]

This function calculates the information gain for a split conducted along each feature, it returns an array of information gains, we need to select the argument that maximizes it.

In [9]:
def best_split_feature(dataset, skip_features = []):
    """
        determines which feature is best to split on.
        @params:
            dataset: the dataset
            skip_features: skips the indicies of these features
    """
    D = np.array(dataset)

    observations, labels = D.T[0:-1].T, D.T[-1].T

    n, m = D.shape
    m -= 1 #accounting for the prediction column.
#     print(m)
#     print labels
    H_D = dataset_entropy(labels)
    gains = H_D * np.ones(m)
    for i in xrange(m): #loop over all features
        if i in skip_features: #skip features specified in an array
            gains[i] = -np.Inf
            continue
        for subset in generate_split(D, i): #loop over all splits
            # check if there is only one entry in the return
            if len(subset.shape) == 1:
                subset = np.array([subset])
            gains[i] -= dataset_entropy(subset[:,-1])
    return gains

This function generates a test on a feature column/index

In [10]:
def generate_test(feature_column, feature_index, continuous_decision=np.mean):
    """
        Generates a test based on
        @params:
            feature_column, a slice of all the observations of this feature
            feature_index, the index of this feature in the dataset
            continous_decision, if the features are continous, the boundary to consider.
    """
    unique_values = np.unique(feature_column)    
    #we now determine if these features are continous or not
    try:
        unique_values.astype(int)
        type_is = 'int'
    except ValueError as ve:
        # these cannot be converted to ints 
        try:
            unique_values = unique_values.astype(float)
            type_is = 'float'
        except ValueError as ve:
            type_is = 'str'
        #endtry
    #endtry
#     print type_is
    
    if type_is in ['int', 'str']:
        boundary = Counter(feature_column).most_common(1)[0][0]
        # picks the most common element, hoping that this might seperate the data the best!
        print('Picked feature',feature_index,' with boundary ',boundary)
        return lambda d: d[feature_index] == boundary
    else:
        boundary = continuous_decision(unique_values)
#         print boundary
        return lambda d: d[feature_index] >= boundary
    

Putting the functions together we get a `generate_best_test` function which returns a tuple of feature_index and the test itself.

In [11]:
def generate_best_test(dataset, skip_features=[], continuous_decision = np.mean):
    """
        generates the best possible test given a dataset, skipping features specified
        @params:
            dataset: the dataset to conduct the split on
            skip_features: the features to skip
            continuous_decision: the decision boundary to which we use in our boundary selection.
        @returns:
            (feature_index, best_test)
    """
    dataset = np.array(dataset)
    feature_index = np.argmax(best_split_feature(dataset, skip_features=skip_features))
    return feature_index, generate_test(dataset[:,feature_index], feature_index, continuous_decision=continuous_decision)

In [12]:
best_test = generate_best_test(D)
print(best_test)

('Picked feature', 0, ' with boundary ', '3')
(0, <function <lambda> at 0x10207a5f0>)


We now write some scaffolding for tree.

In [13]:
class Node(object):
    def __init__(self, L=None, R=None, details=None):
        self.L = L
        self.R = R
        self.details = details
        
    def is_leaf(self):
        return self.L == None and self.R == None
        
class Tree(object):
    def __init__(self, root):
        self.root = root
    @staticmethod
    def build_tree():
        return Tree(Node())

In [14]:
class DecisionNode(Node):
    def __init__(self, L=None, R=None, data_points=[], details=None, skip_features=[]):
#         print('Building a decision node')
#         print('Given datapoints:',data_points)

        # base case 1 (only one data point)
        if len(data_points) == 1:
            self.L=None
            self.R=None
            self.data_points = data_points
            self.label = data_points[-1]
        else:
                        
            D = np.array(data_points)
            # seperate into observaions and labels
            observations, labels = D.T[:-1].T, D.T[-1].T
            
            # base case 2 (no more features to divide)
            n,m = observations.shape
            if m == len(skip_features):
                self.label = Counter(labels).most_common(1)[0][0]
                self.data_points = data_points
                self.L=None
                self.R=None
            else:

                if len(np.unique(labels))==1:
                    # this is a pure node, stop recursing
                    self.L= None
                    self.R=None
                    self.data_points = data_points
                    self.label = np.unique(labels)[0]
                else:

                    # generate a test
                    feature_index, self.decision_function = generate_best_test(data_points, skip_features=skip_features)

                    # apply the test
                    test_passed = np.apply_along_axis(self.decision_function ,axis=1,arr=observations)
                    skip_features.append(feature_index)

                    # obtain the separated datapoints
                    passed_datapoints = D[test_passed,:] 
                    failed_datapoints = D[np.invert(test_passed),:]

#                     print('passed_datapoints:', passed_datapoints)
#                     print('failed_datapoints:', failed_datapoints)

                    # recursively build nodes.
                    self.L = DecisionNode(data_points=passed_datapoints, skip_features=skip_features)
                    self.R = DecisionNode(data_points=failed_datapoints, skip_features=skip_features)
                    self.details = details
                    self.feature_index = feature_index
                    self.data_points = data_points
            
    def classify(self,data_point):
        print('Now classifying',data_point)
        if self.is_leaf():
            return self.label
        rule_result = self.decision_function(data_point)
        if rule_result == True:
            return self.L.classify(data_point)
        else:
            return self.R.classify(data_point)

#     def left_entropy(self):
#         return dataset_entropy(self.L.data_points)
    
#     def right_entropy(self):
#         return dataset_entropy(self.R.data_points)

        
    
class DecisionTree(Tree):
    def __init__(self, dataset, root):
        self.dataset = dataset
        self.root = root
    @staticmethod
    def build_tree(dataset):
        return DecisionTree( dataset= dataset, root= DecisionNode(data_points = dataset, skip_features=[]) )
    def classify(self, data_point):
        return self.root.classify(data_point)

In [15]:
D

[(5, 1, 1, 0, 'A'),
 (5, 1, 0, 1, 'A'),
 (5, 0, 0, 1, 'A'),
 (5, 0, 0, 0, 'A'),
 (3, 0, 0, 0, 'B'),
 (3, 1, 0, 0, 'B'),
 (3, 1, 1, 0, 'B'),
 (3, 1, 1, 1, 'B')]

In [16]:
tree = DecisionTree.build_tree(D)

('Picked feature', 0, ' with boundary ', '3')


In [17]:
tree.classify(D[1])

('Now classifying', (5, 1, 0, 1, 'A'))
('Now classifying', (5, 1, 0, 1, 'A'))


'A'

In [18]:
tree.classify(('10',0,0,0))

('Now classifying', ('10', 0, 0, 0))
('Now classifying', ('10', 0, 0, 0))


'A'