In [1]:
import numpy as np
import random
from tqdm import tqdm

In [2]:
class DecisionTree(object):
    """
    DecisionTree class, that represents one Decision Tree

    :param max_tree_depth: maximum depth for this tree.
    """
    def __init__(self, max_tree_depth):

        # if not trained return 0.5 probability
        self.tree = [{'proba': 0.5,
                      'node': 'is_terminal',
                      'index': 0}]
        self.max_depth = max_tree_depth

    def dict_of_values(self, data):
        
        """
        :param results: dictionary
        :param data: np.array      
        """
        l1 = len(np.array([data[x] for x in range(len(data)) if data[x][0] == 1]))
        l2 = len(np.array([data[x] for x in range(len(data)) if data[x][0] == 0]))        
        return {'1':l1,'0':l2}
        
    def gini_impurity(self,data1, data2):
        """
        :param data1: 2 dimensional python list or numpy 2 dimensional array
        :param data2: 2 dimensional python list or numpy 2 dimensional array
        :return: float
        """
        data_length = len(data1)+len(data2)
        one_fraction = len(data1)/data_length
        two_fraction = len(data2)/data_length
        Gini = float(one_fraction*(1-one_fraction)+two_fraction*(1-two_fraction))
        return Gini

    def divide_data(self, data, feature_column, feature_val):
        """
        :param data: 2 dimensional python list or numpy 2 dimensional array
        :param feature_column: index ov the column to split the data
        :param feature_val: value of the feature
        :return: data1, data2: 2 2 dimensinal python lists or 2 numpy 2 dimensional arrays
        """
        data1 = [x for x in data if x[feature_column] < feature_val]
        data2 = [x for x in data if x[feature_column] >= feature_val]
        return data1, data2

    def fit(self, X, Y):
        """
        :param X: 2 dimensional python list or numpy 2 dimensional array
        :param Y: 1 dimensional python list or numpy 1 dimensional array
        """
        self.tree = []

        # merge labels and data and put them in a list to iterate over
        my_data = list([np.concatenate((Y,X),axis = 1)])

        # depth of the tree
        depth = 0

        # index for each child node
        index = 0

        # incr to keep track of left and right nodes
        my_subtrees = []
        increment = -1

        my_tree = []
        while depth < self.max_depth+1:

            # temporary list to keep all nodes of the tree on current depth
            my_data_test = []

            # First iteration iterates only on entire data
            # each next iteration iterates over all nodes on current depth
            for data in my_data:

                # calculate Gini impurity for parent node
                d1 = np.array([data[x] for x in range(len(data)) if data[x][0] == 1])
                d2 = np.array([data[x] for x in range(len(data)) if data[x][0] == 0])
                Gp = self.gini_impurity(d1,d2)

                # check if parent node is a terminal node  and jump to next iteration
                # node is terminal when Gini impurity is 0 or max_depth is reached
                if Gp == 0 or depth == self.max_depth:
                    proba = len(d1)/len(data)
                    
                    # add a terminal node to the tree
                    self.tree.append({'proba': proba,
                                    'node': 'is_terminal',
                                    'index': len(self.tree)})
                    continue


                # count of features
                feature_len = len(data[0])-1    # first column of data is label column

                # list of best Gini impurities for each feature
                Ginis = []

                # iterate over features to find best feature to split the data over
                for feature in range(1,feature_len+1):

                    # list of all Gini impurities for current feature
                    feature_Gini = []

                    # list of unique feature values by ascending order
                    sorted_features = sorted(list(set([x[feature] for x in data])))

                    # if there is only one value of feature skip splitting
                    if len(sorted_features) <= 1:
                        continue

                    # mid points for each neighbouring feature values
                    mid_points = [(sorted_features[i]+sorted_features[i+1])/2 for i in range(len(sorted_features)-1)]

                    # iterating over each mid point to find best division for current feature
                    for point in mid_points:

                        # divide data into 2 parts with point
                        child1,child2 = self.divide_data(data,feature,point)

                        # calculate Gini of first child
                        c1d1 = np.array([child1[x] for x in range(len(child1)) if child1[x][0] == 1])
                        c1d2 = np.array([child1[x] for x in range(len(child1)) if child1[x][0] == 0])
                        Gc1 = self.gini_impurity(c1d1,c1d2)

                        # calculate Gini of second child
                        c2d1 = np.array([child2[x] for x in range(len(child2)) if child2[x][0] == 1])
                        c2d2 = np.array([child2[x] for x in range(len(child2)) if child2[x][0] == 0])
                        Gc2 = self.gini_impurity(c2d1,c2d2)

                        # calculate information gain
                        IG = Gp-len(child1)/len(data)*Gc1-len(child2)/len(data)*Gc2

                        # append IG and division point and index of feature for current feature
                        feature_Gini.append([IG,point,feature])

                    # after iterating on all mid points find the best division point by IG
                    # append max IG to overall Ginis for current feature
                    Ginis.append(max(feature_Gini))

                # after iterating over all features find best division point for current node    
                # best division point by IG
                best_point = max(Ginis)

                # add probability of being from class 1.0
                proba = len(np.array([data[x] for x in range(len(data)) if data[x][0] == 1]))/len(data)
                
                # create a node
                node = {'feature': best_point[2]-1,
                        'value': best_point[1],
                        'proba': proba,
                        'node': 'is_not_terminal',
                        'index': len(self.tree),
                        'ltree': index+1,
                        'rtree': index+2}
                index += 2
                # add the node to the tree
                self.tree.append(node)

                # divide for the best point
                best_div = self.divide_data(data,best_point[2],best_point[1])

                # append childs 
                my_data_test.append(best_div[0])
                my_subtrees.append('ltree')
                my_data_test.append(best_div[1])
                my_subtrees.append('rtree')
                increment += 1

            # move to next iteration going to the next depth nodes of the tree   
            # pass all childs to my_data to iterate over them on next iteration and increase depth
            my_data = my_data_test
            depth += 1
            
    def predict(self, X):
        """
        :param X: 2 dimensional python list or numpy 2 dimensional array
        :return: Y: 1 dimension python list with labels
        """
        # initialize array to store the scores
        result = np.array([])
        for data_point in X:
            k = 0
            while True:
                # start from first node if it is a terminal node return probability
                if self.tree[k]['node'] == 'is_terminal':
                    proba = self.tree[k]['proba']
                    result = np.append(result,proba)
                    break
                else:
                    # check if the input goes to left subtree or right subtree
                    # update k to be the subtree the input goes into
                    if data_point[self.tree[k]['feature']] < self.tree[k]['value']:
                        k = self.tree[k]['ltree']
                    else:
                        k = self.tree[k]['rtree']
                # repeat the procces until a terminal node is reached

        return result.reshape(X.shape[0],1)


In [3]:
class RandomForest(object):
    """
    RandomForest a class, that represents Random Forests.

    :param num_trees: Number of trees in the random forest
    :param max_tree_depth: maximum depth for each of the trees in the forest.
    :param ratio_per_tree: ratio of points to use to train each of
        the trees.
    """
    def __init__(self, num_trees, max_tree_depth, ratio_per_tree=0.5):
        self.num_trees = num_trees
        self.max_tree_depth = max_tree_depth
        self.ratio_per_tree = ratio_per_tree
        self.trees = None

    def fit(self, X, Y):
        """
        :param X: 2 dimensional python list or numpy 2 dimensional array
        :param Y: 1 dimensional python list or numpy 1 dimensional array
        """
        self.trees = []
        
        for t in tqdm(range(self.num_trees)):
            
            # initialize a decision tree
            clf = DecisionTree(self.max_tree_depth)
            
            # randomly shuffle the data
            idx = np.random.randint(len(X), size=int(len(X)*self.ratio_per_tree))
            Xr = X[idx,:]
            Yr = Y[idx,:]
            
            # fit a decision tree
            clf.fit(Xr,Yr)
            
            # append each tree to tree list
            self.trees.append(clf)

        return self.trees
            
    def predict(self, X):
        """
        :param X: 2 dimensional python list or numpy 2 dimensional array
        :return: (Y, conf), tuple with Y being 1 dimension python
        list with labels, and conf being 1 dimensional list with
        confidences for each of the labels.
        """
        Y = []
        conf = []
        
        for x in X:
            
            # make votes for each data point in X
            voting = [1 if t.predict(np.array([x]))>=0.5 else 0 for t in self.trees ]
            
            # find class with most votes
            y = max(set(voting), key=voting.count)
            Y.append(y)
            
            # find the confidence of votes, i.e. what proportion of trees voted for the winning class
            c = voting.count(y)/len(self.trees)
            conf.append(c)
            
        return (np.array(Y).reshape(len(Y),1), np.array(conf).reshape(len(Y),1))

In [4]:
def accuracy_score(Y_true, Y_predict):
    """
    Calculates what portion of the data had the correct label predicted.
    :param Y_true: list or numpy array of true labels
    :param Y_predict: list or numpy array of predicted labels
    """
    try:
        acc = dict(zip(*np.unique((Y_true == Y_predict), return_counts=True)))[True]/len(Y_true)
    # in case there is no True label
    except:
        acc = 0
    
    return acc

In [5]:
# Load Data
filename = 'SPECTF.dat'
data = np.loadtxt(filename, delimiter=',')
X = data[:, 1:]
y = np.array([data[:, 0]]).T
n, d = X.shape

# Shuffle data
c = list(zip(X, y))
random.shuffle(c)
Xt, yt = zip(*c)
X = np.array(Xt)
y = np.array(yt)

# Define train and test sets
train_size  = int(0.75*len(X))
Xtrain = X[1:train_size, :]  # train on 75% of data
Xtest = X[train_size:, :]
ytrain = y[1:train_size, :]  # test on 25% of data
ytest = y[train_size:, :]

In [6]:
# train Decision Tree
print('fitting Dt')
dt = DecisionTree(7)
dt.fit(Xtrain, ytrain)

# output predictions on the remaining data
y_pred_dt = dt.predict(Xtest)
accuracy_dt = accuracy_score(ytest, y_pred_dt)
print('train accuracy Dt: ', accuracy_score(ytrain, dt.predict(Xtrain)))
print('test accuracy Dt: ', accuracy_dt,'\n')

# train on Random Forest
print('fitting Rf')
rf = RandomForest(25, 7)
forest = rf.fit(Xtrain, ytrain)

# output predictions on the remaining data
y_pred_rf = rf.predict(Xtest)
accuracy_rf = accuracy_score(ytest, y_pred_rf[0])
print('train accuracy Rf: ', accuracy_score(ytrain, rf.predict(Xtrain)[0]))
print('test accuracy Rf: ', accuracy_rf,'\n')

fitting Dt
train accuracy Dt:  0.8391959798994975
test accuracy Dt:  0.6268656716417911 

fitting Rf


100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 25/25 [00:17<00:00,  1.43it/s]


train accuracy Rf:  0.9195979899497487
test accuracy Rf:  0.7761194029850746 

