C4.5決策樹實現_Funtion

In [None]:
import collections
import math
import numpy as np
import pandas as pd
import sklearn
from sklearn.utils import shuffle


class Node:
    def __init__(self, x, y, attribute_list, node_type):
        self.data = x
        self.labels = y
        self.attributes_list = attribute_list
        self.best_attribute = None
        self.split_criterion = None
        self.split_up_down = None
        self.node_type = node_type
        self.leaf_label = None
        self.depth = 0
        self.children = []
        self.parent = None

    def __lt__(self, other):
        return self.depth < other.depth

    def predict_leaf_class(self):
        """
            Computes the frequency of classes in partition D, output the leaf node label predicted class
        :return: pred_class
        """
        # takes frequency of classes in D to determine the majority class to set as output leaf label
        freq_classes = collections.Counter(self.labels)  # [4]
        pred_class = max(freq_classes, key=freq_classes.get)
        self.leaf_label = pred_class
        return pred_class

    def print_node(self):
        """
            Print node values
        """
        print('best att-', self.best_attribute, 'split_crit-', self.split_up_down, self.split_criterion, 'type-',
              self.node_type, 'depth-',
              self.depth, 'class label-', self.leaf_label)

    def copy(self):
        pass


class C45Tree:
    def __init__(self, attributes, data):
        self.tree_nodes = []
        self.depth = 0
        self.num_leaves = 0
        self.root_node = None
        self.attributes = attributes[:-1]
        self.dataset = data

    def train(self, x_train, y_train):
        """
            Helper function to grow tree recursively, creates root node for the tree and initializes the recursion for
            training the tree.
        :param x_train:
        :param y_train:
        """
        # create root node, put data partition in node
        self.root_node = Node(x_train, y_train, self.attributes, 'root')
        self.tree_nodes.append(self.root_node)
        # call grow_tree with root node as base
        self.grow_tree(self.root_node, self.attributes, (x_train, y_train))

    def grow_tree(self, prev_node, attribute_list, D):
        """
            Uses C4.5 decision tree algorithm to grow a tree during training, based on pseudocode from [1].
        :param attribute_list:
        :param D:
        :param prev_node:
        :return: N, the new node
        """
        if prev_node is not None and prev_node.parent is not None:
            if prev_node not in prev_node.parent.children:
                prev_node.parent.children.append(prev_node)

        # check for termination cases
        # check if all tuples in D are in the same class
        if self.check_same_class_labels(D[1]):
            N = Node(D[0], D[1], attribute_list, 'leaf')
            N.depth = prev_node.depth + 1
            N.predict_leaf_class()  # determine the class of the leaf
            N.best_attribute = str(prev_node.best_attribute)
            N.split_up_down = prev_node.split_up_down
            N.split_criterion = prev_node.split_criterion
            self.tree_nodes.append(N)
            prev_node.children.append(N)
            N.parent = prev_node
            return N

        # check if attribute list is empty, do majority voting on class
        if not attribute_list:
            N = Node(D[0], D[1], attribute_list, 'leaf')
            N.depth = prev_node.depth + 1
            N.predict_leaf_class()  # determine the class of the leaf
            N.best_attribute = str(prev_node.best_attribute)
            N.split_criterion = prev_node.split_criterion
            N.split_up_down = prev_node.split_up_down
            self.tree_nodes.append(N)
            prev_node.children.append(N)
            N.parent = prev_node
            return N

        # create new node
        N = Node(D[0], D[1], attribute_list, 'node')
        N.depth = prev_node.depth + 1
        N.parent = prev_node
        # conduct attribute selection method, label node with the criterion
        best_attribute, crit_split_val = self.attribute_selection_method(D, attribute_list)

        N.best_attribute = best_attribute  # label node with best attribute
        N.split_criterion = crit_split_val  # for discrete
        if best_attribute == '':
            # early stop
            N.best_attribute = str(best_attribute)
            N.split_up_down = None
            N.node_type = 'leaf'
            N.data = prev_node.data
            N.labels = prev_node.labels
            N.predict_leaf_class()
            self.tree_nodes.append(N)
            prev_node.children.append(N)
            return N

        # remove split attribute from attribute list
        if best_attribute in attribute_list:
            attribute_list.remove(best_attribute)

        # check if attribute is discrete NOTE THIS LINE NEEDS TO BE MODIFIED FOR DIFFERENT DATASET
        if len(self.dataset[
                   best_attribute].unique()) > 5:  # max 5 discrete categories in attributes from Thyroid set
            # continuous, divide up data at mid point of the values ai + ai1/2
            l_part, r_part, split_val = self.continuous_attribute_data_partition(D, best_attribute)
            N.split_criterion = split_val
            N.split_up_down = 'UP'
            l_child = self.grow_tree(N, attribute_list, l_part)  # upper -> att_val > split_val
            N_V = Node(D[0], D[1], attribute_list, 'node')
            N_V.depth = N.depth
            N_V.best_attribute = best_attribute
            N_V.split_criterion = split_val
            N_V.parent = prev_node
            N_V.split_up_down = 'DOWN'
            r_child = self.grow_tree(N_V, attribute_list, r_part)  # lower -> att_val <= split_val
            N.children.append(l_child)
            N_V.children.append(r_child)
            N.parent = prev_node
            self.tree_nodes.append(N)
            self.tree_nodes.append(N_V)
            prev_node.children.append(N)
            prev_node.children.append(N_V)
            return N
        else:
            # discrete, partition based on unique values of attribute to create nodes for recursion
            vals = self.dataset[best_attribute].unique()  # D[0][best_attribute].unique()
            for v in list(vals):
                data_part = self.partition_data(D, best_attribute, v)

                if not data_part:  # TOGGLED TO EMPTY CAUSES 2 LEAVES ONLY TO BE MADE ** check this
                    # majority class leaf node computed of D
                    L = Node(D[0], D[1], attribute_list, 'leaf')
                    L.depth = N.depth + 1
                    L.best_attribute = best_attribute
                    L.split_criterion = v
                    L.predict_leaf_class()  # determine the class of the leaf
                    self.tree_nodes.append(L)
                    N.children.append(L)
                    L.parent = N
                else:
                    # recursion
                    N_V = Node(D[0], D[1], attribute_list, 'node')
                    N_V.depth = N.depth
                    N_V.best_attribute = best_attribute
                    N_V.split_criterion = v
                    N_V.parent = prev_node
                    N_V.parent.children.append(N_V)
                    child = self.grow_tree(N_V, attribute_list, data_part)

        if N not in self.tree_nodes:
            self.tree_nodes.append(N)
            prev_node.children.append(N)
        return N

    def continuous_attribute_data_partition(self, D, attribute):
        """
            Creates data partitions (left and right) for continuous attributes, computing the mid point that
            enables the best information gain ratio to be calculated from the partition.
        :param D:
        :param attribute:
        :return: l_part, r_part, split_val
        """
        # sort the data, find the value that will gain the max info gain ratio
        data = D[0].sort_values(by=[attribute])
        split_val = 0
        best_igr = 0
        l_part = []
        r_part = []

        for i in range(0, len(data) - 1):
            mid_point = (float(data.iloc[i][attribute]) + float(data.iloc[i + 1][attribute])) / 2
            left_d = D[0].loc[pd.to_numeric(D[0][attribute]) > mid_point]
            left_idx = D[0].index[pd.to_numeric(D[0][attribute]) > mid_point]
            left_y = D[1].loc[left_idx]
            right_d = D[0].loc[pd.to_numeric(D[0][attribute]) <= mid_point]
            right_idx = D[0].index[pd.to_numeric(D[0][attribute]) <= mid_point]
            right_y = D[1].loc[right_idx]
            igr = self.compute_info_gain_ratio_continuous(D, left_y, right_y)

            if igr >= best_igr:
                best_igr = igr
                split_val = mid_point
                l_part = (left_d, left_y)
                r_part = (right_d, right_y)

        return l_part, r_part, split_val

    def compute_info_gain_ratio_continuous(self, D, left_y, right_y):
        """
            Computes the information gain ratio for a continuous attribute partition
        :return info_gain_ratio
        """
        l_y = left_y
        r_y = right_y

        dataset_entropy = self.data_entropy(D[1])
        l_part_entropy = self.data_entropy(l_y)
        l_p_j = float(len(l_y) / len(D))
        l_ent = l_p_j * l_part_entropy
        r_part_entropy = self.data_entropy(r_y)
        r_p_j = float(len(r_y) / len(D))
        r_ent = r_p_j * r_part_entropy

        split_info = - self.split_info(l_p_j) - self.split_info(r_p_j)
        att_ent = l_ent + r_ent

        if split_info == 0:  # prevent division by zero for ratio
            return 0
        else:
            info_gain = self.information_gain(dataset_entropy, att_ent)
            info_gain_ratio = self.information_gain_ratio(info_gain,
                                                          split_info)
        return info_gain_ratio

    @staticmethod
    def check_same_class_labels(labels):
        """
            Checks set of labels to ensure they are of the same class type
        :param labels:
        :return: bool
        """
        if len(set(labels)) == 1:
            return True
        else:
            return False

    def attribute_selection_method(self, D, attribute_list):
        """
            Attribute Selection Method for decision tree as discussed in [1] (Figure 8.3), selects attribute that
            provides the best information gain ratio as a result.
        :param D:
        :param attribute_list:
        :return: best_attribute
        """
        best_attribute = ''
        dataset_entropy = self.data_entropy(D[1])
        best_info_gain_ratio = 0.0
        split_val = ''

        for attribute in attribute_list:
            # a_idx = self.attributes.get(attribute) MIGHT NEED THIS
            v = D[0][attribute].unique()  # find v distinct values of attribute
            att_ent = 0.0
            split_info = 0.0
            curr_val = ''
            val_ent = 0.0
            for val in v:
                data_partition = self.partition_data(D, attribute, val)
                partition_labels = data_partition[1]
                part_entropy = self.data_entropy(partition_labels)
                p_j = float(len(data_partition[1]) / len(D[1]))
                att_ent = att_ent + (p_j * part_entropy)
                split_info = split_info - self.split_info(p_j)

                if part_entropy > val_ent:
                    val_ent = part_entropy
                    curr_val = val

            # Best Attribute checks
            if split_info == 0:  # prevent division by zero for ratio
                continue
            else:
                info_gain = self.information_gain(dataset_entropy, att_ent)
                info_gain_ratio = self.information_gain_ratio(info_gain,
                                                              split_info)  # calculate info gain ratio to select

            # compare the top performing attribute info gain ratio
            if info_gain_ratio > best_info_gain_ratio:
                best_info_gain_ratio = info_gain_ratio
                best_attribute = attribute
                split_val = curr_val
        return best_attribute, split_val

    def class_prob(self, feature_label, labels):
        """
            Computes class probabilities from labels
        :param feature_label:
        :param labels:
        :return: p
        """
        c = collections.Counter(labels)  # [4]
        p = c[feature_label] / len(labels)
        return float(p)

    def data_entropy(self, labels):
        """
            Computes the Entropy, or Info(D) [1]
        :param labels:
        :return: entropy
        """
        entropy = 0.0
        class_freq = collections.Counter(labels)  # [4]
        for l in class_freq.keys():
            p = float(class_freq[l] / len(labels))
            entropy = entropy - math.log(p, 2)
        return entropy

    def information_gain(self, dataset_entropy, attribute_entropy):
        """
            Computes information gain based on the data entropy and attribute entropy [1]
        :param dataset_entropy:
        :param attribute_entropy:
        :return: gain
        """
        gain = dataset_entropy - attribute_entropy
        return gain

    def split_info(self, p_j):
        """
            Computes the information split, used in gain ratio [1]
        :param p_j:
        :return: info_split
        """
        # error protection for zero case
        if p_j == 0:
            return 0

        info_split = (p_j * math.log(p_j, 2))
        return info_split

    def information_gain_ratio(self, gain, split_info):
        """
            Computes information gain ratio [1]
        :param gain:
        :param split_info:
        :return:
        """
        gain_ratio = float(gain / split_info)
        return gain_ratio

    def partition_data(self, D, attribute, val):
        """
            Partitions a dataset D based on the value of a specific attribute
        :param D:
        :param attribute:
        :param val:
        :return: part, part_y
        """
        part = D[0].loc[D[0][attribute] == val]
        part_idx = D[0].index[D[0][attribute] == val]
        part_y = D[1].loc[part_idx]
        return part, part_y

    def test_tree(self, test_sample, node):
        """
            Using recursion, we go through each node (from the root through to the children) to find a leaf label
            to classify the test sample as a prediction.
        :param test_sample:
        :param node:
        :return: node.leaf_label, or recursion
        """

        if node.node_type == 'leaf':
            return node.leaf_label
        else:
            for child in node.children:
                if (child.best_attribute is None or child.best_attribute == '') and child.node_type == 'leaf':
                    return self.test_tree(test_sample, child)

                if (child.best_attribute is None or child.best_attribute == '') and child.node_type == 'node':
                    pass
                else:
                    if child.split_criterion == test_sample[child.best_attribute]:
                        return self.test_tree(test_sample, child)
                    else:
                        if child.split_up_down == 'UP':
                            # check if att_val > split_criterion
                            if pd.to_numeric(test_sample[child.best_attribute]) > float(child.split_criterion):
                                return self.test_tree(test_sample, child)
                            else:
                                pass
                        elif child.split_up_down == 'DOWN':
                            if pd.to_numeric(test_sample[child.best_attribute]) <= float(child.split_criterion):
                                return self.test_tree(test_sample, child)
                            else:
                                pass

    def predict(self, test_x, test_y):  # TODO Add this functionality from the code in main routine
        # uses test set to predict class labels from the constructed tree
        preds = []
        true_pred = 0
        for i in range(len(test_x)):
            tester_instance = test_x.iloc[i]
            pred = self.test_tree(tester_instance, self.root_node)
            # print(str(i), 'pred', pred, 'label', y.iloc[i])
            if pred == test_y.iloc[i]:
                true_pred += 1
            preds.append(pred)

        return true_pred, preds

    def print_tree(self):
        nodes_created = sorted(self.tree_nodes)
        for n in nodes_created:
            n.print_node()
            for d in n.children:
                d.print_node()
            print()
        return

預處理

In [6]:
import pandas as pd
import numpy as np
from sklearn.tree import DecisionTreeClassifier

train_data = pd.read_csv('adult.data', header= None, names=['age', 'workclass', 'fnlwgt', 'education', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'income'])
test_data = pd.read_csv('adult.test', header= None, skiprows=1,  names=['age', 'workclass', 'fnlwgt', 'education', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country', 'income'])

od = test_data

#編號與特徵無關
train_data = train_data.drop(['fnlwgt'], axis=1)
test_data = test_data.drop(['fnlwgt'], axis=1)

#education與education-num相對應(重複)，故刪除。
train_data.drop(['education'], axis = 1, inplace = True)
test_data.drop(['education'], axis = 1, inplace = True)

# 每個元素的前後空格去掉
train_data = train_data.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
test_data = test_data.apply(lambda x: x.str.strip() if x.dtype == "object" else x)

#查看各欄"?"各數
# train_data.apply(lambda x: np.sum(x == "?"))
# test_data.apply(lambda x: np.sum(x == "?"))

#把"?"取代為NaT
train_data.replace("?", pd.NaT, inplace = True)
test_data.replace("?", pd.NaT, inplace = True)


# #將缺失值補齊，名目資料填眾數，數值資料填平均值。
fill_data = {'workclass': train_data['workclass'].mode()[0], 'occupation': train_data['occupation'].mode()[0], 'native-country': train_data['native-country'].mode()[0]}
fill_data_test = {'workclass': test_data['workclass'].mode()[0], 'occupation': test_data['occupation'].mode()[0], 'native-country': test_data['native-country'].mode()[0]}

#缺失值填充
train_data.fillna(fill_data, inplace=True)
test_data.fillna(fill_data_test, inplace=True)

#將income欄位>50K改為1，<=50K改為0
train_data['income'] = train_data['income'].apply(lambda x: 0 if x == "<=50K" else 1)
test_data['income'] = test_data['income'].apply(lambda x: 0 if x == '<=50K.' else 1)


train_data.describe()
# train_data.info()
# train_data

Unnamed: 0,age,education-num,capital-gain,capital-loss,hours-per-week,income
count,32561.0,32561.0,32561.0,32561.0,32561.0,32561.0
mean,38.581647,10.080679,1077.648844,87.30383,40.437456,0.24081
std,13.640433,2.57272,7385.292085,402.960219,12.347429,0.427581
min,17.0,1.0,0.0,0.0,1.0,0.0
25%,28.0,9.0,0.0,0.0,40.0,0.0
50%,37.0,10.0,0.0,0.0,40.0,0.0
75%,48.0,12.0,0.0,0.0,45.0,0.0
max,90.0,16.0,99999.0,4356.0,99.0,1.0


預處理_正規化、Deal with duplicate

In [7]:
from sklearn.preprocessing import MinMaxScaler
# Extract numerical columns
numerical_columns = ['age', 'education-num', 'capital-gain', 'capital-loss', 'hours-per-week']

# Initialize the scaler
scaler = MinMaxScaler()

# Fit and transform the scaler on the training data
train_data_scaled = pd.DataFrame(scaler.fit_transform(train_data[numerical_columns]), columns=numerical_columns)
train_data[numerical_columns] = train_data_scaled

# Transform the test data using the same scaler
test_data_scaled = pd.DataFrame(scaler.transform(test_data[numerical_columns]), columns=numerical_columns)
test_data[numerical_columns] = test_data_scaled


# 使用pandas的get_dummies函數執行獨熱編碼
train_data = pd.get_dummies(train_data, columns=['workclass', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country'], dtype=int)


test_data = pd.get_dummies(test_data, columns=['workclass', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'native-country'], dtype=int)

#在讀熱編碼後會依照有名目之欄位產生資料，train_data比test_data多出了該欄位，故將test_data新增該欄位，讓兩個資料集欄位相同。
test_data['native-country_Holand-Netherlands'] = 0

#刪除重複列
train_data.drop_duplicates(inplace=True)
test_data.drop_duplicates(inplace=True)

#將資料隨機化
from sklearn.utils import shuffle
train_data = shuffle(train_data)

#train_data.describe()


測試集準確率

In [8]:
td = train_data
column = td.pop("income")
td.insert(td.shape[1], "income", column)
####

columns = td.columns.tolist()
# print(columns)

X_train = td.drop('income', axis=1)
y_train = td['income']

ted = test_data
t_column = ted.pop("income")
ted.insert(ted.shape[1], "income", t_column)


X_test = ted.drop('income', axis=1)
y_test = ted['income']



system_test = C45Tree(columns, td)
system_test.train(X_train, y_train)
true_pred, preds = system_test.predict(X_test, y_test)

print('Full set test accuracy:', true_pred / len(X_test))

# print(preds)

Full set test accuracy: 0.814089505158704


預測結果轉換為 Python 陣列

In [9]:
#將 R 的預測結果轉換為 Python 陣列
#rpy2
from openpyxl import Workbook
#產出Excel(Test data)
wb = Workbook()
ws = wb.active
ws.append(['age', 'workclass', 'fnlwgt', 'education', 'education-num', 'marital-status', 'occupation', 'relationship', 'race', 'sex', 'capital-gain', 'capital-loss', 'hours-per-week', 'native-country','income','Predict result'])

for i in range(len(preds)):
    if preds[i] == 0:
        result = '<=50K.'
    else:
        result = '>50K.'
    #將現在loop到原始資料的列轉為list
    li = od.iloc[i,:].tolist()
    #
    li.append(result)
    ws.append(li)
wb.save('C45.xlsx')
# predictions = np.array(predictions)

In [10]:
print("節點數!")


節點數!


In [11]:
print(len(system_test.tree_nodes))

113


In [12]:
print("葉子!")

葉子!


計算葉節點

In [13]:
leaf_count = 0
for n in system_test.tree_nodes:
    # print(n.print_node())
    if n.node_type == 'leaf':
        leaf_count += 1

print(leaf_count)


54
