Made by: *Vincent Kwan*

In [216]:
#The first row is always the Headers
#The last column is always the label

training_data = []
with open('test.txt', 'r') as f:
    training_data = f.read().splitlines()
training_data = [x.split(',') for x in training_data]


In [217]:
#print(training_data)
print(len(training_data))

6


In [218]:
headers = training_data[0]

In [219]:
def uniques(rows, col):
    #return unique values for columns - skip the headers
    return set(row[col] for row in rows[1:])

In [220]:
uniques(training_data, 0)

{'Green', 'Red', 'Yellow'}

In [221]:
def countlabels(rows):
    #count the number of labels in a node
    dict_label = dict()
    for row in rows:
        if row[-1] not in dict_label:
            dict_label[row[-1]] = 0
        dict_label[row[-1]] += 1
    return dict_label

In [222]:
countlabels(training_data[1:])

{' Apple': 2, ' Grape': 2, ' Lemon': 1}

In [223]:
def gini_impurity(rows):
    #calculate the gini impurity for a node
    counts = countlabels(rows)
    total = sum(counts.values())
    impurity = 1
    for label in counts:
        prob = counts[label]/total
        prob = prob ** 2
        impurity -= prob
    return impurity

In [224]:
def weighted_impurity(lrow, rrow):
    #calculate weighted impurity of child nodes
    ltotal = len(lrow)
    rtotal = len(rrow)
    total = ltotal + rtotal
    w_imp = ((ltotal/total) * gini_impurity(lrow)) + ((rtotal/total) * gini_impurity(rrow))
    return w_imp

In [225]:
#testing if the function works, no mixing:
test1 = [['Red', 'Color'], ['Yellow', 'Color']]
gini_impurity(test1)

0.0

In [226]:
#some mixing
test2 = [['Ravioli', 'Food'], ['Paracetamol', 'Medicine'], ['Pug', 'Dog']]
gini_impurity(test2)

0.6666666666666665

In [227]:
class Condition:
    ''' A condition which is used to divide values into child nodes.'''
    def __init__(self, col, val):
        self.value = val
        self.column = col
    
    def is_match(self, row):
        test_val = row[self.column]

        if isinstance(test_val, float) or isinstance(test_val, int):
            #if data is of numeric type, then check whether bigger than or equal to
            return test_val >= self.value
        else:
            #otherwise, if data is of character type, then check equality
            return test_val == self.value

    def __repr__(self):
        if isinstance(self.value, float) or isinstance(self.value, int):
            operator = ' >= '
        else:
            operator = ' == '
        
        return (str(headers[self.column]) + operator + str(self.value))

In [228]:
#testing the Condition class
C = Condition(1, 'Biden')
trow = ['President', 'Biden']
C.is_match(trow)

True

In [229]:
def partitioner(rows, condition):
    #Partition the dataset based upon the condition
    #just like how the british divided India and Pakistan
    leftrows, rightrows = [], []
    for row in rows:
        if condition.is_match(row):
            rightrows.append(row)
        else:
            leftrows.append(row)
    return leftrows, rightrows

In [230]:
#testing 
lr, rr = partitioner(training_data, Condition(0, 'Red'))
weighted_impurity(lr, rr)

0.41666666666666663

In [231]:
lr

[['Color', ' Diameter', ' Label'],
 ['Green', ' 3', ' Apple'],
 ['Yellow', ' 3', ' Apple'],
 ['Yellow', ' 3', ' Lemon']]

In [232]:
rr

[['Red', ' 1', ' Grape'], ['Red', ' 1', ' Grape']]

In [233]:
lr, rr = partitioner(training_data, Condition(0, 'Green'))
weighted_impurity(lr, rr)

0.5999999999999999

In [234]:
lr

[['Color', ' Diameter', ' Label'],
 ['Yellow', ' 3', ' Apple'],
 ['Red', ' 1', ' Grape'],
 ['Red', ' 1', ' Grape'],
 ['Yellow', ' 3', ' Lemon']]

In [235]:
rr

[['Green', ' 3', ' Apple']]

In [236]:
#Test whether partitioner works
#All rows whose 1st column value is Square should be put in rrows <- true if using source.txt
lrow, rrow = partitioner(training_data[1:], Condition(0, 'Square'))
rrow

[]

In [237]:
def splitter(rows):
    '''Finds the best condition to split the dataset on.'''
    best_con = None
    bg = 0
    curr_gini = gini_impurity(rows)
    
    for c in range(len(headers) - 1): #ignore the last column since it contains label
        values = uniques(rows, c)

        for v in values:
            condition = Condition(c, v)
            lrows, rrows = partitioner(rows, condition)

            if len(lrows) == 0 or len(rrows) == 0:
                continue
            
            w_imp = weighted_impurity(lrows, rrows)

            info_gain = curr_gini - w_imp

            if info_gain > bg:
                best_con = condition
                bg = info_gain

    return bg, best_con

In [238]:
#testing info_gain and best condition
bi, bc = splitter(training_data)
print(bi, bc)

0.3055555555555556 Color == Red


In [239]:
class Node():
    def __init__(self, rows):
        self.labels = countlabels(rows)
        self.dprob = dict()

        self.total = sum(self.labels.values())
        for label in self.labels.keys():
            #print(label)
            self.dprob[label] = self.labels[label]/self.total

In [240]:
class LeafNode(Node):
    def __init__(self, rows):
        super(LeafNode, self).__init__(rows)
    
    def state(self):
        return 'I am a leaf node.'

In [241]:
class InternalNode(Node):
    def __init__(self, lrows, rrows, cond):
        self.state = 'I am a branch'
        self.condition = cond
        self.lbranch = lrows
        self.rbranch = rrows
    
    def state(self):
        return 'I am an internal node.'

In [242]:
def treebuilder(rows):
    gain, condition = splitter(rows)

    if gain == 0:
        return LeafNode(rows)

    lrows, rrows = partitioner(rows, condition)

    lbranch = treebuilder(lrows)
    rbranch = treebuilder(rrows)

    return InternalNode(lbranch, rbranch, condition)

In [243]:
tree = treebuilder(training_data[1:])

In [244]:
def print_tree(node, spacing=""):

    if isinstance(node, LeafNode):
        print (spacing + "Predict", node.dprob)
        return


    print (spacing + str(node.condition))


    print (spacing + '--> True:')
    print_tree(node.rbranch, spacing + "  ")


    print (spacing + '--> False:')
    print_tree(node.lbranch, spacing + "  ")

In [256]:
def print_leaf(counts):

    total = sum(counts.values())
    probs = {}
    for lbl in counts.keys():
        probs[lbl] = counts[lbl]/total
    return probs

In [245]:
print_tree(tree)

Color == Red
--> True:
  Predict {' Grape': 1.0}
--> False:
  Color == Yellow
  --> True:
    Predict {' Apple': 0.5, ' Lemon': 0.5}
  --> False:
    Predict {' Apple': 1.0}


In [247]:
def classifier(row, treenode):
    '''Returns leaf nodes. Taken from Josh Gordon.'''
    if isinstance(treenode, LeafNode):
        return treenode.dprob

    if treenode.condition.is_match(row):
        return classifier(row, treenode.rbranch)
    else:
        return classifier(row, treenode.lbranch)

In [246]:
testing_data = [
    ['Green', 3, 'Apple'],
    ['Yellow', 4, 'Apple'],
    ['Red', 2, 'Grape'],
    ['Red', 1, 'Grape'],
    ['Yellow', 3, 'Lemon'],
]

In [257]:
for row in testing_data:
    print("Actual: {0}. Predicted: {1}".format(row[-1], print_leaf(classifier(row, tree))))

Actual: Apple. Predicted: {' Apple': 1.0}
Actual: Apple. Predicted: {' Apple': 0.5, ' Lemon': 0.5}
Actual: Grape. Predicted: {' Grape': 1.0}
Actual: Grape. Predicted: {' Grape': 1.0}
Actual: Lemon. Predicted: {' Apple': 0.5, ' Lemon': 0.5}
