In [1]:
import pandas as pd
from collections import Counter

In [2]:
train_data = pd.read_csv('./combined-swell-classification-eda-train-dataset.csv')
test_data = pd.read_csv('./combined-swell-classification-eda-test-dataset.csv')
val_data = pd.read_csv('./combined-swell-classification-eda-validation-dataset.csv')

train_X = train_data
test_X = test_data
val_X = val_data

# train_Y, test_Y, val_Y
train_Y = train_X["condition"]
test_Y = test_X["condition"]
val_Y = val_X["condition"]

In [3]:
# train_X
del train_X["NasaTLX class"]
del train_X["Condition Label"]
del train_X["NasaTLX Label"]
del train_X["condition"]
del train_X["subject_id"]

# test_X
del test_X["NasaTLX class"]
del test_X["Condition Label"]
del test_X["NasaTLX Label"]
del test_X["condition"]
del test_X["subject_id"]

# val_X
del val_X["NasaTLX class"]
del val_X["Condition Label"]
del val_X["NasaTLX Label"]
del val_X["condition"]
del val_X["subject_id"]

In [4]:
class Node:
    def __init__(self):
        self.split_feature = None
        self.split_point = None
        self.result = None
        self.childs = None
        

In [5]:
class DecisionTree:
    
    def __init__(self):
        self.root = None
        
    def DT_print(self, cur_node = None, cnt = 0):
        if cnt == 0:
            cur_node = self.root
        print(' ' * cnt, "Level ", cnt," :: ", cur_node.split_feature, cur_node.split_point, cur_node.result)
        if cur_node.childs is None:
            return
        for child in cur_node.childs:
            self.DT_print(child, cnt + 1)
        
    def Gini_Impurity(self, data_Y):  # input label dataset of a group
        impurity = 1
        label_counts = Counter(data_Y)
        for label in label_counts:
            p_of_label = label_counts[label] / len(data_Y)
            impurity -= p_of_label ** 2
        return impurity
    
    def Information_Gain(self, unsplited_data_Y, splited_data_Y):
        gain = self.Gini_Impurity(unsplited_data_Y)
        # print(gain)
        # print(splited_data_Y)
        for subset in splited_data_Y:
            # print("-", Gini_Impurity(subset), " X ( ", len(subset), " / ", len(unsplited_data_Y), " )")
            gain -= self.Gini_Impurity(subset) * (len(subset)/ len(unsplited_data_Y))
        return gain
    
    def Split(self, data_X, data_Y, column):
        data_X_subsets = []    # 분할 후의 data_X 그룹을 저장하는 배열
        data_Y_subsets = []    # 분할 후의 data_Y 그룹을 저장하는 배열
        split_point = 0.0      # 최적 분할 기준 값
        split_point_gain = 0.0  # 최적 분할 지점에서의 Information Gain
        '''
        print("=-=- Before reset Index =-=-=-")
        print("**** data_X ****")
        print(data_X[column])
        print("**** data_Y ****")
        print(data_Y)
        '''
        data_X = data_X.sort_values(by=column)
        data_Y = data_Y[data_X.index]
        data_X = data_X.reset_index(drop=True)
        data_Y = data_Y.reset_index(drop=True)
        '''
        print("=-=- After reset Index =-=-=-")
        print(data_X.index)
        print(data_X[column])
        print(data_Y)

        print(data_X[column].iloc[5], data_Y[5])
        '''
        for i in range(1, len(data_Y)):
            candidate_splited_data_X = []
            candidate_splited_data_Y = []
            if data_Y[i-1] != data_Y[i]:
                # print(i, data_Y[i-1], data_Y[i])
                candidate_point = (data_X[column].iloc[i-1] + data_X[column].iloc[i]) / 2
                candidate_splited_data_Y.append(data_Y[:i])
                candidate_splited_data_Y.append(data_Y[i:])
                gain = self.Information_Gain(data_Y, candidate_splited_data_Y)
                if gain > split_point_gain:
                    candidate_splited_data_X.append(data_X[:i])
                    candidate_splited_data_X.append(data_X[i:])
                    split_point = candidate_point
                    split_point_gain = gain
                    data_X_subsets = candidate_splited_data_X
                    data_Y_subsets = candidate_splited_data_Y
                    # print("== Updated :: ", split_point_gain, split_point)
        return split_point_gain, split_point, data_X_subsets, data_Y_subsets
    
    def Find_Best_Split(self, data_X, data_Y):
        # print("=-=-New Group=-=-")
        best_feature = ''    # 데이터를 분할 할 feature
        best_gain = 0.0       # 데이터를 특정 feature로 분할했을 때 가장 높게 측정된 Information Gain
        best_split_point = 0.0
        for column in data_X.columns:  # RF에서 Bagging Features 적용 필요.
            # print("check column :: ", column)
            gain, split_point = self.Split(data_X, data_Y, column)[0:2]
            if gain > best_gain:
                best_gain = gain
                best_feature = column
                best_split_point = split_point
        return best_feature, best_gain, best_split_point
    
    def fit(self, data_X, data_Y, cnt=0):
        root = Node()

        data_X = data_X.reset_index(drop=True)
        data_Y = data_Y.reset_index(drop=True)

        best_feature, best_gain, best_split_point = self.Find_Best_Split(data_X, data_Y)
        if best_gain == 0:
            root.result = data_Y[0]
            # print(' ' * cnt, "== No Split ", cnt," :: ", root.result)
            return root
        data_X_subsets, data_Y_subsets = self.Split(data_X, data_Y, best_feature)[2:]

        # print(' ' * cnt, "== Split ", cnt," :: ", best_feature, best_gain)
        childs = []
        for i in range(len(data_X_subsets)):
            childs.append(self.fit(data_X_subsets[i], data_Y_subsets[i], cnt+1))

        root.split_feature = best_feature
        root.split_point = best_split_point
        root.childs = childs
        if cnt == 0:
            self.root = root
        return root
    
    def predict(self, dataset):
        result = []
        for i in dataset.index:
            cur_node = self.root
            while cur_node.result is None:
                value = dataset.loc[i, cur_node.split_feature]
                if value < cur_node.split_point:
                    cur_node = cur_node.childs[0]
                else :
                    cur_node = cur_node.childs[1]
            result.append(cur_node.result)
        return result

In [6]:
newDT = DecisionTree()
print(newDT)

newDT.fit(test_X.head(100), test_Y.head(100))

<__main__.DecisionTree object at 0x00000205D3827A88>


<__main__.Node at 0x205d3827a48>

In [7]:
newDT.DT_print()

 Level  0  ::  SKEW_YEO_JONSON 0.6287600117852042 None
  Level  1  ::  MIN_ONSET 0.918701171875 None
   Level  2  ::  None None time pressure
   Level  2  ::  SKEW -1.571927617886033e-16 None
    Level  3  ::  None None interruption
    Level  3  ::  SKEW 1.3895592427316656 None
     Level  4  ::  None None no stress
     Level  4  ::  ALSC 39.5004326609572 None
      Level  5  ::  None None no stress
      Level  5  ::  None None interruption
  Level  1  ::  RANGE_BOXCOX -2.626687010094331 None
   Level  2  ::  KURT 9.023855762998295 None
    Level  3  ::  INSC_APSC 9526.51035036534 None
     Level  4  ::  MEAN_1ST_GRAD -0.000255648014997318 None
      Level  5  ::  STD_PEAKS 5.477192644234585 None
       Level  6  ::  None None interruption
       Level  6  ::  None None no stress
      Level  5  ::  MAX_PEAKS 82.405029296875 None
       Level  6  ::  MIN_PEAKS 2.716064453125 None
        Level  7  ::  None None interruption
        Level  7  ::  None None no stress
       Level  6  

In [8]:
dataset = test_X.loc[0:10]
newDT.predict(dataset)

['no stress',
 'no stress',
 'no stress',
 'no stress',
 'no stress',
 'no stress',
 'no stress',
 'no stress',
 'no stress',
 'interruption',
 'interruption']

In [9]:
test_Y[0:10]

0       no stress
1       no stress
2       no stress
3       no stress
4       no stress
5       no stress
6       no stress
7       no stress
8       no stress
9    interruption
Name: condition, dtype: object