Code to accompany Machine Learning Recipes #8. We'll write a Decision Tree Classifier, in pure Python. Below each of the methods, I've written a little demo to help explain what it does.

In [1]:
import pandas as pd
import numpy as np
from collections import Counter
from sklearn.model_selection import train_test_split

## Decision Tree Implementation
* The Tree takes the whole Dataset as an input
* DataSet must be a list of list with each list in the dataset representing a row
* The implementation presumes **the last column represents label**

##### Helper Functions

In [2]:
### Method to find unique values in a Column
def find_unique_feature_values(rows, col):
    return set([row[col] for row in rows])

In [3]:
### Count the number of each label in the data
### The method assumes the last column to be the label in the dataset
def label_count(rows):
    counts = Counter((np.array(rows))[:,-1]) # a dictionary of label -> count.
    return counts
    

In [4]:
### Check if a value is numeric value.
def is_numeric(value):
    return isinstance(value, int) or isinstance(value, float)

##### Class to hold the data partition criteria

In [5]:
##This class  is used to partition the dataset.it records the column index and the value used for partition criteria
##records a 'column number' e.g., 0 for 'Outlook' and a value e.g. 'Sunny'
##The 'compare' method is used to compares the passed feature value with the partition citeria value 
    
class partition_criteria:
    
    def __init__(self, column, value):
        self.col = column
        self.value = value

    def compare(self, row):
        val = row[self.col]
        if is_numeric(val):
            return val >= self.value
        else:
            return val == self.value

    def __repr__(self):
        # Helper function to print the criteria
        condition = "=="
        if is_numeric(self.value):
            condition = ">="
        return "%s %s %s?" % (header[self.col], condition, str(self.value))

##### Splitter Function : Splits and retuns lists of  passed and failed data

In [6]:
##get_split() : Splits the dataset ,  
##for each row in the dataset checks if it matches the partition criteria. 
##if it matches add to 'success' rows, otherwise, add it to 'failed' rows.

def get_split(rows, criteria):
    success, failed = [], []
    for row in rows:
        if criteria.compare(row):
            success.append(row)
        else:
            failed.append(row)
    return success, failed

##### Function to calculate Gini Impurity

In [7]:
##Calculate the gini impurity as : 
## gini_impurity = 1 - sum(probability_of_each_class**2)
def gini_impurity(rows):
    counts = label_count(rows)
    impurity = 1
    for cls in counts:
        probability = counts[cls] / len(rows)
        impurity -= probability**2
    return impurity

##### Function to calculate Information Gain

In [8]:
### Information Gain is defined as:
### IG = Starting_impurity - sum(Weighted_impurity_of_each_label for a feature) 

def info_gain(left, right, current_uncertainty):
    p = float(len(right)) / (len(left) + len(right))
    return current_uncertainty - p * gini_impurity(right) - (1 - p) * gini_impurity(left)

##### Function to find the best split : Returns the spilt Criteri and best IG value

In [9]:
## Find the best criteria to split by iterating over every feature / value
## and calculating the information gain."""

def find_best_split(rows):
    
    best_IG = 0  # record the best information gain
    best_criteria = None  # record the feature / value that produced it
    
    current_uncertainty = gini_impurity(rows)
    
    # n_features = number of columns, excluding the label column which we presume is the last column
    n_features = len(rows[0])-1   
    
    for col in range(n_features):  # for each feature
        
        #get unique values in the column
        values = set([row[col] for row in rows])

        # for each value
        for val in values:  

            # find the best criteria to split
            criteria = partition_criteria(col, val)

            # split the dataset on criteria
            pass_rows, fail_rows = get_split(rows, criteria)

            # if pass_rows or fail_rows ==0 skip this split as it doesn't divide the dataset
            if len(pass_rows) == 0 or len(fail_rows) == 0:
                continue

            # Calculate the information gain from this split
            gain = info_gain(pass_rows, fail_rows, current_uncertainty)

            # if the gain >= best_IG set best_IG and best_criteria.
            if gain >= best_IG:
                best_IG, best_criteria = gain, criteria

    return best_IG, best_criteria

##### This class represents the Leaf Node in a Tree

In [10]:
##Leaf class just holds the labels/predictions in a dictionary {'Yes', 'No'}
class Leaf_Node:
    def __init__(self, rows):
        self.predictions = dict(label_count(rows))
        

##### This class represents a non-Leaf Node or a Decison Nede it holds :
* Split Criteria for the Node
* The passed branch (passes criteria)
* The failed branch (fails criteria)

In [11]:
## This class holds a reference to the criteria, and the two child nodes.
class Decision_Node:
    def __init__(self,
                 criteria,
                 passed_branch,
                 failed_branch):
        self.criteria = criteria
        self.passed_branch = passed_branch
        self.failed_branch = failed_branch

##### Builds the Tree recursively 

In [12]:
## This method Builds the tree recursively.

def build_tree(rows):
    
    # Partitioning the dataset on each of the unique attribute,
    # calculate the information gain,
    # and return the criteria that produces the highest gain.
    #print(f"buid_tree = {rows}")
    info_gain, criteria = find_best_split(rows)
    #print(f"Node Split criteria : {criteria}")
    
    # Base case: no further info gain
    # There are no more criterias for comparison, return a leaf node
    if info_gain == 0:
        return Leaf_Node(rows)

    # Found useful feature/value to split the data
    passed_rows, failed_rows = get_split(rows, criteria)

    # Recursively build the branch that passed the criteria.
    successful_branch = build_tree(passed_rows)

    # Recursively build the branch that failed the criteria.
    failed_branch = build_tree(failed_rows)

    # Return a Decision node.
    # This Node represents :
      ### The  feature / value to check for at this point,
      ### Branches to follow depending on the answer.
    return Decision_Node(criteria, successful_branch, failed_branch)

##### Prints the Tree Recursively : Prints the Criteria for the Node and at the leaf the label and Probability

In [13]:
def print_tree(node, spacing=""):
        
    if isinstance(node, Leaf_Node):
        prob = {}
        total = sum(node.predictions.values())
        for key in node.predictions.keys():
            prob[key] = str((node.predictions[key]/total *100)) + '%'
        print (spacing + f"Decision : {prob}")
        return

    # Print the criteria at the node
    print (spacing + str(node.criteria))

    # Call recursively on the passed branch
    print (spacing + '--> True:')
    print_tree(node.passed_branch, spacing + "  ")

    # Call recursively on the failed branch
    print (spacing + '--> False:')
    print_tree(node.failed_branch, spacing + "  ")

##### Recursive Classification Function : Takes a row and decides the Label for it.

In [14]:
## Method to run the classification on Data recursively
def classify(row, node):
    # If the node of type Leaf, we have a decision on class
    if isinstance(node, Leaf_Node):
        return node.predictions

    # Now do the classification of two branches passed-branch / failed-branch.
    # Compare the feature / value stored in the node, to the record we're considering.
    if node.criteria.compare(row):
        return classify(row, node.passed_branch)
    else:
        return classify(row, node.failed_branch)

#### Print the Leaf Node

In [15]:
## Print the leaf node with probability as %
def print_leaf(counts):
    total = sum(counts.values()) * 1.0
    probs = {}
    for lbl in counts.keys():
        probs[lbl] = str(int(counts[lbl] / total * 100)) + "%"
    return probs

#### Calculate the accuracy of the DT

In [16]:
def accuracy(data):
    accuracy_score = 0
    total =0
    for row in testing_data:
        counts = classify(row, my_tree)
        total += sum(counts.values()) * 1.0
        if row[-1] in counts:
            accuracy_score += counts[row[-1]]
    return accuracy_score/total * 100        
    

## Test  the Decision Tree for Weather Data

In [17]:
### Read the csv using Pandas and split into training and testing Data

weather_data = "seattle-weather.csv"
weatherData = pd.read_csv(weather_data)
weatherData = weatherData.drop(["date"] , axis=1,inplace=False)
header = list(weatherData.columns.values)
weatherData = weatherData.values.tolist()
training_data, testing_data = train_test_split(weatherData, test_size=.40, random_state=1)

In [18]:
weatherData

[[0.0, 12.8, 5.0, 4.7, 'drizzle'],
 [10.9, 10.6, 2.8, 4.5, 'rain'],
 [0.8, 11.7, 7.2, 2.3, 'rain'],
 [20.3, 12.2, 5.6, 4.7, 'rain'],
 [1.3, 8.9, 2.8, 6.1, 'rain'],
 [2.5, 4.4, 2.2, 2.2, 'rain'],
 [0.0, 7.2, 2.8, 2.3, 'rain'],
 [0.0, 10.0, 2.8, 2.0, 'sun'],
 [4.3, 9.4, 5.0, 3.4, 'rain'],
 [1.0, 6.1, 0.6, 3.4, 'rain'],
 [0.0, 6.1, -1.1, 5.1, 'sun'],
 [0.0, 6.1, -1.7, 1.9, 'sun'],
 [0.0, 5.0, -2.8, 1.3, 'sun'],
 [4.1, 4.4, 0.6, 5.3, 'snow'],
 [5.3, 1.1, -3.3, 3.2, 'snow'],
 [2.5, 1.7, -2.8, 5.0, 'snow'],
 [8.1, 3.3, 0.0, 5.6, 'snow'],
 [19.8, 0.0, -2.8, 5.0, 'snow'],
 [15.2, -1.1, -2.8, 1.6, 'snow'],
 [13.5, 7.2, -1.1, 2.3, 'snow'],
 [3.0, 8.3, 3.3, 8.2, 'rain'],
 [6.1, 6.7, 2.2, 4.8, 'rain'],
 [0.0, 8.3, 1.1, 3.6, 'rain'],
 [8.6, 10.0, 2.2, 5.1, 'rain'],
 [8.1, 8.9, 4.4, 5.4, 'rain'],
 [4.8, 8.9, 1.1, 4.8, 'rain'],
 [0.0, 6.7, -2.2, 1.4, 'drizzle'],
 [0.0, 6.7, 0.6, 2.2, 'rain'],
 [27.7, 9.4, 3.9, 4.5, 'rain'],
 [3.6, 8.3, 6.1, 5.1, 'rain'],
 [1.8, 9.4, 6.1, 3.9, 'rain'],
 [13.5, 8.9, 3.

In [19]:
#Test label_count
#counts=label_count(training_data)
#counts

In [20]:
# Test the partition_criteria
#pc = partition_criteria(0, 'Rainfall')
#pc

In [21]:
#Test label_count
#counts=label_count(training_data)
#counts

In [22]:
# Test using a sample row from training_data
#sample = training_data[11]
#print(sample)
#pc.compare(sample) 

In [23]:
# Test get_split
#passed_rows, failed_rows = get_split(training_data, partition_criteria(0, 'Rainfall'))
#passed_rows

In [24]:
#######
# Test gini_impurity for training data
# Calculate the impurity of training data.
#current_impurity = gini_impurity(training_data)
#current_impurity

In [25]:
# Information gain by partioning on 'Rainfall'?
#p_rows, f_rows = get_split(training_data, partition_criteria(0, 'Rainfall'))
#info_gain(p_rows, f_rows, current_impurity)

In [26]:
# Information gain by partioning on 'Overcast'
#p_rows, f_rows = get_split(training_data, partition_criteria(0,'Overcast'))
#info_gain(p_rows, f_rows, current_impurity)

In [27]:
# More information is gained using 'Overcast' (0.15), than 'Rainfall' (0.0042).
# Why? Look at the different splits that result, and see which one
# looks more 'unmixed' to you.
#p_rows, f_rows = get_split(training_data, partition_criteria(0,'Overcast'))

# Here, the true_rows contain only 'Yes'.
#p_rows

In [28]:
# And the false rows contain two types of Decisions (Yes and No).Which means more impurity
#f_rows

In [29]:
# On the other hand, partitioning by Rainfall doesn't help so much.(No pure groups)
#passed_rows, failed_rows = get_split(training_data, partition_criteria(0,'Rainfall'))

# We've isolated one apple in the true rows.
#passed_rows

In [30]:
# But, the false-rows are badly mixed up.
#failed_rows
#######

In [31]:
# Test find_best_criteria on training_data .
#best_gain, best_criteria = find_best_split(training_data)
#best_criteria

#### Build a Tree using Training Data

In [32]:
my_tree = build_tree(training_data)

##### Print the Tree

In [33]:
print_tree(my_tree)

precipitation >= 0.3?
--> True:
  temp_min >= 1.1?
  --> True:
    temp_min >= 3.3?
    --> True:
      precipitation >= 23.9?
      --> True:
        precipitation >= 25.4?
        --> True:
          Decision : {'rain': '100.0%'}
        --> False:
          Decision : {'snow': '100.0%'}
      --> False:
        Decision : {'rain': '100.0%'}
    --> False:
      wind >= 4.2?
      --> True:
        wind >= 4.3?
        --> True:
          precipitation >= 1.3?
          --> True:
            temp_max >= 8.3?
            --> True:
              temp_max >= 8.9?
              --> True:
                wind >= 5.1?
                --> True:
                  Decision : {'rain': '100.0%'}
                --> False:
                  wind >= 4.9?
                  --> True:
                    Decision : {'snow': '100.0%'}
                  --> False:
                    Decision : {'rain': '100.0%'}
              --> False:
                Decision : {'snow': '100.0%'}
            --> Fa

In [34]:
## Test Classify :
#classify(training_data[0], my_tree)


In [35]:
## Test print_leaf
#print_leaf(classify(training_data[0], my_tree))

#### Test the DT for testing_data

In [36]:
for row in testing_data:
    print (f"Actual: {row[-1]}. Prediction_with_Probability: { print_leaf(classify(row, my_tree)) }")

Actual: drizzle. Prediction_with_Probability: {'sun': '100%'}
Actual: sun. Prediction_with_Probability: {'drizzle': '100%'}
Actual: rain. Prediction_with_Probability: {'fog': '100%'}
Actual: sun. Prediction_with_Probability: {'rain': '100%'}
Actual: rain. Prediction_with_Probability: {'rain': '100%'}
Actual: rain. Prediction_with_Probability: {'rain': '100%'}
Actual: fog. Prediction_with_Probability: {'sun': '100%'}
Actual: rain. Prediction_with_Probability: {'rain': '100%'}
Actual: rain. Prediction_with_Probability: {'rain': '100%'}
Actual: rain. Prediction_with_Probability: {'rain': '100%'}
Actual: sun. Prediction_with_Probability: {'sun': '100%'}
Actual: fog. Prediction_with_Probability: {'sun': '100%'}
Actual: sun. Prediction_with_Probability: {'sun': '100%'}
Actual: sun. Prediction_with_Probability: {'fog': '100%'}
Actual: rain. Prediction_with_Probability: {'rain': '100%'}
Actual: sun. Prediction_with_Probability: {'sun': '100%'}
Actual: sun. Prediction_with_Probability: {'sun': 

In [37]:
accuracy(testing_data) 

98.26314050626938