In [1]:
import numpy as np
import pandas as pd
from sklearn.utils import shuffle
import os

In [2]:
datafile='winequality-white.csv'
data=pd.read_csv(datafile, sep=';')

In [3]:
class Datasets:
  
    names=["training", "test", "validation"]
  
    def __init__(self, data, proportions = [5, 1, 1]):
        assert(len(proportions) == 3)
        data = data.copy(deep=True)
        self.data_sets_ = dict()
        self.set_len = (proportions/np.sum(proportions) * data.shape[0]).astype(int)    
        self.create_dataset(data)
    
    def create_dataset(self, data):
        self.columns_name = data.columns
        data = shuffle(data)
        labels = data.quality.values
        X=data.drop(["quality"], axis=1)
        X=X.values
        start=0
        end=0
        for i in range(3):
            end=start+self.set_len[i]
            self.data_sets_[Datasets.names[i]]=[X[start:end,:], labels[start:end]]
            start=start+self.set_len[i]

In [4]:
def get_entropy(y, classes):
    probs=np.asarray([np.sum(y == k)/y.shape[0] for k in classes])
    return -np.sum(probs[probs> 0]*np.log2(probs[probs>0]))

def split_entropy(node_probs, weights):
    p=node_probs[probs > 0]
    s=-np.sum(p*np.log2(p))
    return np.average(s, weights=weights) 

def order_choice(X_data, no_attr):
    return np.arange(no_attr)

rng_features = np.random.RandomState(seed = 150)

def random_choice(X_data, no_attr):
    return np.unique(rng_features.choice(X_data.shape[1], size=no_attr, replace=False))

In [5]:
class Node:
    
    def __init__(self):
        self.splitting_value = -1
        self.isLeaf = False
        self.column = -1
        self.childs = {}
      
    def set_as_splitting_node(self, value, childs, column):
        self.splitting_value = value
        self.childs = childs
        self.column = column

    def set_as_leaf(self, label):
        self.isLeaf = True
        self.childs["label"] = label

In [6]:
class DecisionTree:
  
    def __init__(self, labels, attr_choose_alg, bag):
        self.classes = np.unique(labels)
        self.bag = bag
        self.attr_choose_alg = attr_choose_alg
        self.root = None
    
    def train(self, train_data, labels):
        self.root = self.create_tree(train_data, labels)
  
    def get_class_counts(self, y):
        class_counts = np.zeros(10)
        idx, counts = np.unique(y, return_counts=True)
        class_counts[idx] = counts
        return class_counts
  
    def find_split_attribute(self, X, y):
        best_entropy = 10.0
        xt_best = -1
        no_column = -1
        attr_list = self.attr_choose_alg(X, self.bag)
        counts = self.get_class_counts(y)
        n = y.shape[0]
        for i in attr_list:
            div_values = np.sort(np.unique(X[:, i], axis = 0))
            div_values = np.unique([(div_values[j] + div_values[j - 1])/2 for j in range(1, div_values.shape[0], 1)])
            for val in div_values:
                entropy = 0
                split_mask = X[:, i] < val
                left_card = int(np.sum(split_mask))
                prob_l = left_card/y.shape[0]
                prob_r = (y.shape[0] - left_card)/y.shape[0]
                entropy = prob_l * get_entropy(y[split_mask], self.classes)
                entropy += prob_r * get_entropy(y[~split_mask], self.classes)
                if(entropy < best_entropy):
                    best_entropy = entropy
                    xt_best = val
                    no_column = i
        return no_column, xt_best
  
    def split_data_(self, data, labels, column, split_val):
        childs_data = {}
        mask = data[:, column] < split_val
        return data[mask], labels[mask], data[~mask], labels[~mask]
  
    def create_tree(self, X, y):
        classes=np.unique(y)
        node=Node()
        if(len(classes)==1):
            node.set_as_leaf(classes[0])
            return node
        else:
            attribute,xt=self.find_split_attribute(X, y)
            left_X, left_y, right_X, right_y  = self.split_data_(X, y, attribute, xt)
            if(left_X.shape[0] > 0 and right_X.shape[0] > 0):
                left_child = self.create_tree(left_X, left_y)
                right_child = self.create_tree(right_X,right_y)
                node.set_as_splitting_node(xt, {"left": left_child, "right": right_child},attribute)
            else:
                node.set_as_leaf(classes[0])
        return node
    
    def make_decision(self, data):
        curr_node = self.root
        while not curr_node.isLeaf:
            column = curr_node.column
            if data[column]  < curr_node.splitting_value:
                curr_node = curr_node.childs["left"]
            else:
                curr_node = curr_node.childs["right"]
        return curr_node.childs["label"]

    def predict(self, data):
        return np.array([self.make_decision(data[i, :]) for i in range(data.shape[0])])
  
    def vote(self, votes):
        classes, countes = np.unique(votes, return_counts=True)
        return classes[np.argmax(countes)]
    
    def find_node_before_leaf(self, current_node, node_list, X, y):
        if(current_node.isLeaf == True):
            return True
        else:
            add_to_list = True
            attribute = current_node.column
            xt = current_node.splitting_value
            left_X, left_y, right_X, right_y  = self.split_data_(X, y, attribute, xt)
            lchild = self.find_node_before_leaf (current_node.childs["left"], node_list, left_X, left_y)
            rchild = self.find_node_before_leaf (current_node.childs["right"], node_list, right_X, right_y)
      
        add_to_list = (lchild and rchild)
        if add_to_list == True:
            major_class = self.vote(y)
            current_node.childs["label"] = major_class
            node_list.append(current_node)
        return False
    
    def prune_tree(self, x_train, y_train,  x_val, y_val, eps = 0):
        pruned = True     
        while pruned:
            pruned = False
            node_list = []
            self.find_node_before_leaf(self.root, node_list, x_train, y_train)
            acc1=np.sum(y_val == self.predict(x_val))/y_val.shape[0]
            for node in node_list:
                node.isLeaf = True
                acc2=np.sum(y_val == self.predict(x_val))/y_val.shape[0]
                if acc1 - acc2 <= eps:
                    acc1 = acc2
                    node.childs.pop("left")
                    node.childs.pop("right")
                    pruned = True
                else:
                    node.isLeaf = False
                    node.childs.pop("label")

In [7]:
def evaluate_tree(model, x_set, y_set):
    predicted_classes = model.predict(x_set)
    acc=np.sum(y_set == predicted_classes)/y_set.shape[0]
    return predicted_classes, acc

In [8]:
#test

proportion=[5,1,1]
eps=0

datasets = Datasets(data, proportion)
x_train, y_train = datasets.data_sets_["training"]
x_test, y_test = datasets.data_sets_["test"]
x_val, y_val = datasets.data_sets_["validation"]
  
decision_tree = DecisionTree(y_train, order_choice, x_train.shape[1])
decision_tree.train(x_train, y_train)
#decision_tree.prune_tree(x_train, y_train, x_val, y_val, eps)

acc_train = evaluate_tree(decision_tree, x_train, y_train)
acc_test = evaluate_tree(decision_tree, x_test, y_test)
acc_validation = evaluate_tree(decision_tree, x_val, y_val)

print("train: ",acc_train[1])
print("test: ",acc_test[1])
print("validation: ",acc_validation[1])

train:  1.0
test:  0.586552217453505
validation:  0.5937052932761088


In [None]:
class RandomForest():
    def __init__(self, labels, attr_choose_alg, trees, feature_bag):
        self.bag = feature_bag
        self.classes = np.unique(labels)
        self.trees = trees
        self.attr_choose_alg = attr_choose_alg
        self.tries = []
  
    def train(self, X, y):
        rng = np.random.RandomState(seed = 64)

        for i in range(self.trees):
            idx = rng.choice(X.shape[0], size=X.shape[0], replace=True)
            x_subset, y_subset = X[idx], y[idx]
            tree = DecisionTree(self.classes, self.attr_choose_alg, self.bag)
            tree.train(x_subset, y_subset)
            self.tries.append(tree)
  
    def vote(self, prediction):
        classes, counts = np.unique(prediction, return_counts=True)
        return classes[np.argmax(counts)]
    
    def predict(self, X):
        tries_predictions = np.array([self.tries[i].predict(X) for i in range(self.trees)])
        predicted_classes = np.array([self.vote(votes) for votes in tries_predictions.T])
        return predicted_classes

In [None]:
#test

proportion=[5,1,1]
trees = 30

datasets = Datasets(data, proportion)
x_train, y_train = datasets.data_sets_["training"]
x_test, y_test = datasets.data_sets_["test"]
x_val, y_val = datasets.data_sets_["validation"]
  
rf = RandomForest(y_train, random_choice, trees, int(np.sqrt(x_train.shape[1])))
rf.train(x_train, y_train)
  
acc_train = evaluate_tree(rf, x_train, y_train)
acc_test = evaluate_tree(rf, x_test, y_test)

print("train: ",acc_train[1])
print("test: ",acc_test[1])
print("validation: ",acc_validation[1])