This assignment is based on the example provided by Josh Gordon for his Machine Learning Recipes series.
https://github.com/random-forests/tutorials

In [None]:
from __future__ import print_function
from typing import List
# 
# This notebook contains ASSIGNMENTs which might require some coding, these coding assignments are followed by tests
# where you can check if your implementation is correct. Try to go through the blocks slowly and execute them once you
# feel like you understand what they are doing.
# 

# Below is our toy dataset we are going to use for building a decision tree using CART
# 
# Format: each row is an example.
# The last column is the label.
# The first two columns are features.
# Feel free to play with it by adding more features & examples.
# Interesting note: I've written this so the 2nd and 5th examples
# have the same features, but different labels - so we can see how the
# tree handles this case.
training_data = [
    ['Green', 3, 'Apple'],
    ['Yellow', 3, 'Apple'],
    ['Red', 1, 'Grape'],
    ['Red', 1, 'Grape'],
    ['Yellow', 3, 'Lemon'],
]
# Column labels.
# These are used only to print the tree.
header = ["color", "diameter", "label"]

In [None]:
# Some utillity functions
# 
Dataset = List[List[float]]
Sample = List[float]

def unique_vals(rows: Dataset, col: int) -> int:
    """Find the unique values for a column in a dataset."""
    return set([row[col] for row in rows])

print("unique_vals(training_data, 1): ", unique_vals(training_data, 1))
print("unique_vals(training_data, 2): ", unique_vals(training_data, 2))
print()

def class_counts(rows: Dataset):
    """Counts the number of each type of example in a dataset."""
    counts = {}  # a dictionary of label -> count.
    for row in rows:
        # in our dataset format, the label is always the last column
        label = row[-1]
        if label not in counts:
            counts[label] = 0
        counts[label] += 1
    return counts

print("class_counts(training_data): ", class_counts(training_data))
print()

def is_numeric(value) -> bool:
    """Test if a value is numeric."""
    return isinstance(value, int) or isinstance(value, float)

print("is_numeric(7): ", is_numeric(7))
print("is_numeric(\"Red\"): ", is_numeric("Red"))
print()

In [None]:
# CONTAINS ASSIGMENT 1
# 
class Question:
    """A Question is used to partition a dataset, so it is used in the 
    eventual decision tree to decide to which childnode or leaf to continue
    with a particular datarow.
    
    This class just records a 'column number' (e.g., 0 for Color) and a
    'column value' (e.g., Green). The 'match' method is used to compare
    the feature value in an example to the feature value stored in the
    question. See Assignment 1.
    """

    def __init__(self, column, value):
        self.column = column
        self.value = value

        
    def match(self, example) -> bool:
        # ASSIGNMENT 1:
        #
        # implement match which should compare the value in example at self.column to self.value
        #
        # for numeric example values it should check if the example value is greater then or equal to the question value
        # for non numeric example values it should check equality with the question value

    def __repr__(self) -> str:
        # This is just a helper method to print
        # the question in a readable format.
        condition = "=="
        if is_numeric(self.value):
            condition = ">="
        return "Is %s %s %s?" % (
            header[self.column], condition, str(self.value))

In [None]:
# TEST ASSIGNMENT 1:
# 
# Let's write a question for a numeric attribute
question1 = Question(1, 3)

# Lets try the question out
answer1 = question1.match(['Green', 3, 'Apple'])
answer2 = question1.match(['Red', 1, 'Grape'])

print("#", question1, " -> ", answer1, ", ", answer2)
if answer1 and not answer2:
    print("PASS")
else:
    print("FAIL")
# should print: 
# Is diameter >= 3?  ->  True ,  False

In [None]:
# TEST ASSIGNMENT 1:
# 
# Let's write a question for a categorical attribute
question2 = Question(0, 'Green')

# Lets try the question out
answer3 = question2.match(['Green', 3, 'Apple'])
answer4 = question2.match(['Red', 1, 'Grape'])

print("#", question2, " -> ", answer3, ", ", answer4)
if answer3 and not answer4:
    print("PASS")
else:
    print("FAIL")
# should print: 
# Is color == Green?  ->  True ,  False

In [None]:
# CONTAINS ASSIGNMENT
# 
def partition(rows: Dataset, question: Question) -> (Dataset, Dataset):
    """Partitions a dataset.

    For each row in the dataset, check if it matches the question. If
    so, add it to 'true rows', otherwise, add it to 'false rows'.
    """
    true_rows, false_rows = [], []
    # ASSIGNMENT 2:
    #
    # implement partitioning of the data rows using the question
    
    return true_rows, false_rows

In [None]:
# TEST ASSIGNMENT 2:
# 
# Let's partition the training data based on whether rows are Red.
training_data = [
    ['Green', 3, 'Apple'],
    ['Yellow', 3, 'Apple'],
    ['Red', 1, 'Grape'],
    ['Red', 1, 'Grape'],
    ['Yellow', 3, 'Lemon'],
]
true_rows, false_rows = partition(training_data, Question(0, 'Red'))

# This will contain all the 'Red' rows.
print("#", true_rows)
# This will contain all the none 'Red' rows
print("#", false_rows)

if (set(map(lambda x: tuple(x), true_rows)) == set(map(lambda x: tuple(x), training_data[2:4])) and
    set(map(lambda x: tuple(x), false_rows)) == set(map(lambda x: tuple(x), [training_data[0], training_data[1], training_data[4]]))):
    print("PASS")
else:
    print("FAIL")
# should print: 
# [['Red', 1, 'Grape'], ['Red', 1, 'Grape']]
# [['Green', 3, 'Apple'], ['Yellow', 3, 'Apple'], ['Yellow', 3, 'Lemon']]

In [None]:
# DEMO:
# 
# The Gini impurity tells us basically how unlikely we are to incorrectly guess a label in
# a (partial) dataset. So generally, the less different labels there are in a dataset, the lower 
# the impurity. Low impurity is what we strife for as it means that this (partial) dataset is easy to predict 
# Let's look at some example to understand how Gini Impurity works.
#
def gini(rows: Dataset) -> float:
    """Calculate the Gini Impurity for a list of rows. 

    There are a few different ways to do this, I thought this one was
    the most concise. See:
    https://en.wikipedia.org/wiki/Decision_tree_learning#Gini_impurity
    """
    counts = class_counts(rows)
    impurity = 1
    for lbl in counts:
        prob_of_lbl = counts[lbl] / float(len(rows))
        impurity -= prob_of_lbl**2
    return impurity

# First, we'll look at a dataset with no mixing.
no_mixing = [['Apple'],
              ['Apple']]
# this will return 0
print(gini(no_mixing))
# Now, we'll look at dataset with a 50:50 apples:oranges ratio
some_mixing = [['Apple'],
               ['Orange']]
# this should return 0.5 - meaning, there is a 50% chance of misclassifying -> If a have a random
# element of this dataset it is either an apple or an orange
print(gini(some_mixing))

In [None]:
# ASSIGNMENT 3:
# 
# try to predict what the gini impurity is of the more_mixing set of labels
more_mixing = [['Apple'],
               ['Grape'],
               ['Lemon']]
print(gini(more_mixing))

In [None]:
# DEMO:
# 
# Calculate the uncertainy of our training data.
training_data = [
    ['Green', 3, 'Apple'],
    ['Yellow', 3, 'Apple'],
    ['Red', 1, 'Grape'],
    ['Red', 1, 'Grape'],
    ['Yellow', 3, 'Lemon'],
]
current_uncertainty = gini(training_data)
current_uncertainty

In [None]:
# ASSIGNMENT 4
# 
# implement the info_gain function
# 
# After calculating the gini impurity for the original dataset, we would like to know what question we should
# ask to separate the dataset most optimally. The function that quantifies how optimal our dataset is split
# after a question we call the info gain function. It is calculated as follows:
# 
# calculate the weighted average of the 

def info_gain(left: Dataset, right: Dataset, current_gini_impurity: float) -> float:
    # fraction of data that is in the left dataset
    p_left = float(len(left)) / (len(left) + len(right)) 
    
    # calculated the weighted average (use p_left)
    gini_weighted_average = 
    
    return current_gini_impurity - gini_weighted_average

Hint: weighted_average$ = \sum_i p_i x_i$

In [None]:
# TEST ASSIGNMENT 4:
# 
training_data = [
    ['Green', 3, 'Apple'],
    ['Yellow', 3, 'Apple'],
    ['Red', 1, 'Grape'],
    ['Red', 1, 'Grape'],
    ['Yellow', 3, 'Lemon'],
]
# How much information do we gain by partioning the original dataset on 'Green'?
true_rows, false_rows = partition(training_data, Question(0, 'Green'))
info_gain_green = info_gain(true_rows, false_rows, gini(training_data))

# What about if we partioned on 'Red' instead?
true_rows, false_rows = partition(training_data, Question(0,'Red'))
info_gain_red = info_gain(true_rows, false_rows, gini(training_data))

print("#", info_gain_green)
print("#", info_gain_red)

if (info_gain_green == 0.1399999999999999 and info_gain_red == 0.37333333333333324):
    print("PASS")
else:
    print("FAIL")
# should print: 
# 0.1399999999999999
# 0.37333333333333324

In [None]:
# ASSIGNMENT 5:
# 
# It looks like we learned more using 'Red' (0.37), than 'Green' (0.14).
# Why? Look at the different splits that result, and see which one
# looks more 'unmixed' to you.

questionRed = Question(0,'Red')
print(questionRed)
true_rows, false_rows = partition(training_data, questionRed)
print("partial set 1 -> ", [item[2] for item in true_rows])
print("partial set 2 -> ", [item[2] for item in false_rows])
print()

questionGreen = Question(0,'Green')
print(questionGreen)
true_rows, false_rows = partition(training_data, questionGreen)
print("partial set 1 -> ", [item[2] for item in true_rows])
print("partial set 1 -> ", [item[2] for item in false_rows])

In [None]:
# DEMO:
# 
# Now we define a function that tries to figure out the best question to ask
# for a dataset. It iterates over all the features and values and tries all the
# combinations to find the best one
def find_best_split(rows: Dataset) -> (float, Question):
    """Find the best question to ask by iterating over every feature / value
    and calculating the information gain."""
    best_gain = 0  # keep track of the best information gain
    best_question = None  # keep track of the feature / value that produced it
    current_uncertainty = gini(rows) # uncertainty of the input dataset for calculation of the info_gain
    n_features = len(rows[0]) - 1  # number of columns (-1 for the label column)

    for col in range(n_features):  # for each feature

        values = set([row[col] for row in rows])  # unique values in the column

        for val in values:  # for each value

            question = Question(col, val)

            # try splitting the dataset
            true_rows, false_rows = partition(rows, question)

            # Skip this split if it doesn't divide the
            # dataset.
            if len(true_rows) == 0 or len(false_rows) == 0:
                continue

            # Calculate the information gain from this split
            gain = info_gain(true_rows, false_rows, current_uncertainty)

            # You actually can use '>' instead of '>=' here
            # but I wanted the tree to look a certain way for our
            # toy dataset.
            if gain >= best_gain:
                best_gain, best_question = gain, question

    return best_gain, best_question

training_data = [
    ['Green', 3, 'Apple'],
    ['Yellow', 3, 'Apple'],
    ['Red', 1, 'Grape'],
    ['Red', 1, 'Grape'],
    ['Yellow', 3, 'Lemon'],
]
# Lets try to find the best question to ask for our initial dataset
best_gain, best_question = find_best_split(training_data)

print(best_question)
print("info_gain: ", best_gain)
true_rows, false_rows = partition(training_data, best_question)
print("partial set 1 -> ", [item[2] for item in true_rows])
print("partial set 1 -> ", [item[2] for item in false_rows])

# FYI: is color == Red is just as good. See the note in the code above
# where I used '>='.

In [None]:
# DEMO:
# 
# Definition of our tree leave and decision nodes
class Leaf:
    """A Leaf node classifies data.

    This holds a dictionary of class (e.g., "Apple") -> number of times
    it appears in the rows from the training data that reach this leaf.
    """

    def __init__(self, rows: Dataset):
        self.predictions = class_counts(rows)

class Decision_Node:
    """A Decision Node asks a question.

    This holds a reference to the question, and to the two child nodes.
    """

    def __init__(self,
                 question: Question,
                 true_branch,
                 false_branch):
        self.question = question
        self.true_branch = true_branch
        self.false_branch = false_branch

In [None]:
# DEMO:
# 
# Function for recursively building the decision tree by starting with the dataset and using
# find_best_split to split the current dataset over child nodes. The recursion stops once it runs out of
# questions to ask
def build_tree(rows: Dataset) -> Decision_Node:
    """Builds the tree.

    Rules of recursion: 1) Believe that it works. 2) Start by checking
    for the base case (no further information gain). 3) Prepare for
    giant stack traces.
    """

    # Try partitioing the dataset on each of the unique attribute,
    # calculate the information gain,
    # and return the question that produces the highest gain.
    gain, question = find_best_split(rows)

    # Base case: no further info gain
    # Since we can ask no further questions,
    # we'll return a leaf.
    if gain == 0:
        return Leaf(rows)

    # If we reach here, we have found a useful feature / value
    # to partition on.
    true_rows, false_rows = partition(rows, question)

    # Recursively build the true branch.
    true_branch = build_tree(true_rows)

    # Recursively build the false branch.
    false_branch = build_tree(false_rows)

    # Return a Question node.
    # This records the best feature / value to ask at this point,
    # as well as the branches to follow
    # dependingo on the answer.
    return Decision_Node(question, true_branch, false_branch)

def print_tree(node, spacing=""):
    """World's most elegant tree printing function."""

    # Base case: we've reached a leaf
    if isinstance(node, Leaf):
        print (spacing + "Predict", node.predictions)
        return

    # Print the question at this node
    print (spacing + str(node.question))

    # Call this function recursively on the true branch
    print (spacing + '--> True:')
    print_tree(node.true_branch, spacing + "  ")

    # Call this function recursively on the false branch
    print (spacing + '--> False:')
    print_tree(node.false_branch, spacing + "  ")
    
def print_leaf(counts):
    """A nicer way to print the predictions as percentages at a leaf."""
    total = sum(counts.values()) * 1.0
    probs = {}
    for lbl in counts.keys():
        probs[lbl] = str(int(counts[lbl] / total * 100)) + "%"
    return probs

In [None]:
# DEMO:
# 
# Build the tree from our training data, see that indeed the first question being asked is 'Is diameter >= 3'? 
# As we have seen when we tested find_best_split
training_data = [
    ['Green', 3, 'Apple'],
    ['Yellow', 3, 'Apple'],
    ['Red', 1, 'Grape'],
    ['Red', 1, 'Grape'],
    ['Yellow', 3, 'Lemon'],
]
my_tree = build_tree(training_data)
print_tree(my_tree)

In [None]:
# CONTAINS ASSIGMENT
# 
# Function for classifying a data sample once the tree is build
def classify(sample: Sample, node):
    

    # Base case: we've reached a leaf
    if isinstance(node, Leaf):
        return node.predictions

    # ASSIGNMENT 6
    #
    # Recursive case: 
    # Decide based on asking node.question for the row argument wether to follow node.true_branch or node.false_branch

# Reminder of what a decision node looks like    
class Decision_Node:
    """A Decision Node asks a question.
    This holds a reference to the question, and to the two child nodes.
    """
    def __init__(self,
                 question: Question,
                 true_branch,
                 false_branch):
        self.question = question
        self.true_branch = true_branch
        self.false_branch = false_branch    


In [None]:
# TEST ASSIGNMENT 6
# 
# The tree predicts the 1st row of our
# training data is an apple with confidence 1.
print("#", print_leaf(classify(training_data[0], my_tree)))
# On the second example, the confidence is lower
print("#", print_leaf(classify(training_data[1], my_tree)))

# should print: 
# {'Apple': '100%'}
# {'Apple': '50%', 'Lemon': '50%'}

In [None]:
# TEST ALL ASSIGNMENTS
# 
testing_data = [
    ['Green', 3, 'Apple'],
    ['Yellow', 4, 'Apple'],
    ['Red', 2, 'Grape'],
    ['Red', 1, 'Grape'],
    ['Yellow', 3, 'Lemon'],
]
my_tree = build_tree(training_data)
for row in testing_data:
    print ("# Actual: %s. Predicted: %s" %
           (row[-1], print_leaf(classify(row, my_tree))))
    
# should print:
# Actual: Apple. Predicted: {'Apple': '100%'}
# Actual: Apple. Predicted: {'Apple': '50%', 'Lemon': '50%'}
# Actual: Grape. Predicted: {'Grape': '100%'}
# Actual: Grape. Predicted: {'Grape': '100%'}
# Actual: Lemon. Predicted: {'Apple': '50%', 'Lemon': '50%'}

In [None]:
# ASSIGNMENT 7
# 
# Try Adding more fruits and/or features to the testing data and see what happens.
header = ["color", "diameter", "label"]
testing_data = [
    ['Green', 3, 'Apple'],
    ['Yellow', 4, 'Apple'],
    ['Red', 2, 'Grape'],
    ['Red', 1, 'Grape'],
    ['Yellow', 3, 'Lemon'],
]
my_tree = build_tree(training_data)
print_tree(my_tree)
for row in testing_data:
    print ("Actual: %s. Predicted: %s" %
           (row[-1], print_leaf(classify(row, my_tree))))
# 
# Questions to think about/try:
# 
# What happens if i generate a tree with fineqrained numeric data? Say I have a bunch of example data which includes: 
#  - apples with diameters between 500-700 mm
#  - grapes with diameters between 50 mm and 100 mm
# For which presumably valid weight fruit combinations would this algorithm fail? How to improve?
# 
# How do I deduce how many labels I can maximally distinguish between from the feature columns?
# 
# What are some other problems you can think of using this type of supervised learning?

In [None]:
# DEMO:
# 
# Lets import a more interesting dataset, namely the iris flower dataset -> 
# https://en.wikipedia.org/wiki/Iris_flower_data_set
from sklearn import datasets
import random

# load data of the iris dataset
iris = datasets.load_iris()

# convert it so it works in our decision tree algorithm
random.seed(1) # set the seed for reproducabillity
header = iris.feature_names
iris_data = [sample.tolist() + [iris.target_names[iris.target[ind]].__str__()] for ind, sample in enumerate(iris.data)]
random.shuffle(iris_data)

# split into test and train data:
split_ind = int(len(iris_data)/2)
test_data = iris_data[:split_ind]
training_data = iris_data[split_ind:]

# build the iris tree
iris_tree = build_tree(training_data)
print_tree(iris_tree)

# Evaluate the build tree on the test data
correct_predictions = 0
for sample in test_data:
    prediction = print_leaf(classify(sample, iris_tree))
    if sample[4] in prediction.keys():
        correct_predictions += float(prediction[sample[4]][:-1]) / 100
        
print("Percentage correct in test set:", 100 * correct_predictions / split_ind)

In [None]:
# BONUS ASSIGNMENT
# 
# Random forest
import random
import operator

def find_best_split_for_features(rows: Dataset, feature_columns: List[int]) -> (float, Question):
    # Alter the find_best_split function such that it takes a list of feature columns to use in the algorithm 
    # instead of iterating over all the features (feature bagging).
    
    
def build_tree_for_features(rows: Dataset, feature_columns: List[int]):
    # Try partitioing the dataset on each of the unique attribute,
    # calculate the information gain,
    # and return the question that produces the highest gain.
    gain, question = find_best_split_for_features(rows, feature_columns)

    # Base case: no further info gain
    # Since we can ask no further questions,
    # we'll return a leaf.
    if gain == 0:
        return Leaf(rows)

    # If we reach here, we have found a useful feature / value
    # to partition on.
    true_rows, false_rows = partition(rows, question)

    # Recursively build the true branch.
    true_branch = build_tree_for_features(true_rows, feature_columns)

    # Recursively build the false branch.
    false_branch = build_tree_for_features(false_rows, feature_columns)

    # Return a Question node.
    # This records the best feature / value to ask at this point,
    # as well as the branches to follow
    # dependingo on the answer.
    return Decision_Node(question, true_branch, false_branch)
    
def build_forest(rows: Dataset, features_per_tree: int, num_trees: int) -> List[Decision_Node]:
    total_features = len(rows[0]) - 1  # number of columns (-1 for the label column)
    trees = []
    
    for x in range(0, num_trees):
        trees += [build_tree_for_features(rows, random.sample(range(0, total_features), features_per_tree))]
    return trees

def classify_random_forest(sample: Sample, nodes: List[object]) -> str:
    votes = {};
    
    for node in nodes:
        prediction = classify(sample, node)
        # Add a (fractional) vote for all the labels in this node 
        total_in_prediction = sum(prediction.values())
        for label, replication in prediction.items():
            if label in votes.keys():
                votes[label] += float(replication) / total_in_prediction
            else:
                votes[label] = float(replication) / total_in_prediction
         
    # return the label with the biggest amount of votes
    return max(votes.items(), key=operator.itemgetter(1))[0]

In [None]:
# Try the iris flower dataset with the random forest
# https://en.wikipedia.org/wiki/Iris_flower_data_set
from sklearn import datasets
import random

# load data of the iris dataset
iris = datasets.load_iris()

# convert it so it works in our decision tree algorithm
random.seed(2) # set the seed for reproducabillity
header = iris.feature_names
iris_data = [sample.tolist() + [iris.target_names[iris.target[ind]].__str__()] for ind, sample in enumerate(iris.data)]
random.shuffle(iris_data)

# split into test and train data:
split_ind = int(len(iris_data)/2)
test_data = iris_data[:split_ind]
training_data = iris_data[split_ind:]

# build the random forest
iris_forest = build_forest(training_data, 2, 1000)
correct_predictions = 0;
for sample in test_data:
    expected = sample[4]
    result = classify_random_forest(sample, iris_forest)
    print("expected:", expected, "-> result:", result)
    if expected == result:
        correct_predictions += 1

print("Percentage correct in test set:", 100 * correct_predictions / split_ind)