In [1]:
import numpy as np
import random

def sigmoid(x):
    return 1 / (1+np.exp(-x))

In [2]:
class DataGeneration:
    
    def __init__(self, name, file_path, seperation_rate, is_normalized=False):
        
        self.name = name
        
        self.file_path = file_path
        
        self.seperation_rate = seperation_rate
        
        self.is_normalized = is_normalized
        
        print("DataGeneration object is created !!")
    
    # np.max() 를 이용한 데이터 정규화
    def data_normalize_using_max(self, loaded_data):
        
        # 각 열의 최대값을 찾기 위해 행과 열을 바꾸어 줌. 즉 전치향렬을 만들어줌
        transpose_loaded_data = loaded_data.T        

        print("transpose_loaded_data.shape = ", transpose_loaded_data.shape)
        
        # 전치행렬을 위한 리스트
        transpose_normalize_data_list = []

        for index in range(len(transpose_loaded_data)):
    
            max_value = np.max(transpose_loaded_data[index, :])   # 각 행의 최대값을 찾음
    
            # 최대값이 1 이상이면 최대값으로 나누어서 
            # 모든 데이터가 0 ~ 1 사이에 오도록 함
            if max_value > 1.0:  

                transpose_normalize_data_list.append(transpose_loaded_data[index, :] / max_value)

            # 최대값이 1 보다 작으면 해당 값을 그대로 사용함. 
            ## 왜냐하면 1보다 작은 값이면 굳이 바꿀 필요가 없음
            else:      

                transpose_normalize_data_list.append(transpose_loaded_data[index, :])
        

        # 리스트를 numpy type 으로 변환
        transpose_normalize_data = np.array(transpose_normalize_data_list)

        print(transpose_normalize_data.shape)

        # 데이터 저장을 위해 다시 전치행렬을 통해 행과 열을 바꿈
        normalize_data = transpose_normalize_data.T

        print(normalize_data.shape)

        # normalize 파일저장
        data_save_path = './Normalize_' + self.name + '_data.csv'
        
        np.savetxt(data_save_path, normalize_data, delimiter=',')
        
        return normalize_data

    # shuffle 기능을 이용하여 training_data / test_data 생성
    def generate(self):
    
        # 데이터 불러오기

        loaded_data = np.loadtxt(self.file_path, delimiter=',', dtype=np.float32)
        
        print("[debug. before data normalization] loaded_data[0] = ", loaded_data[0])

        print("loaded_data.shape = ", loaded_data.shape)
        
        if (self.is_normalized == True):
            
            loaded_data = self.data_normalize_using_max(loaded_data)
            
            print("[debug. after data normalization] loaded_data[0] = ", loaded_data[0])

        # 임시 저장 리스트
        training_data_list = []
        test_data_list = []

        # 분리비율에 맞게 테스트데이터로 분리
        total_data_num = len(loaded_data)
        test_data_num = int(len(loaded_data) * self.seperation_rate)

        print("total_data_num = ", total_data_num, ", test_data_num = ", test_data_num)

        # 전체 데이터 인덱스를 가지고 있는 리스트 생성
        total_data_index_list = [ index for index in range(total_data_num) ]

        # random.shuffle 을 이용하여 인덱스 리스트 생성
        random.shuffle(total_data_index_list)  # 전체 인덱스가 랜덤하게 섞여진 리스트로 변형된다

        # test data 를 위한 인덱스는 total_data_index_list 로뷰터 앞에서 40 % 의 데이터 인덱스
        test_data_index_list = total_data_index_list[ 0:test_data_num ]

        print("length of test_data_index_list = ", len(test_data_index_list))

        # training data 를 위한 인덱스는 total_data_index_list 에서 test data 인덱스를 제외한 나머지 부분
        training_data_index_list = total_data_index_list[ test_data_num: ]

        print("length of training_data_index_list = ", len(training_data_index_list))

        # training data 구성
        for training_data_index in training_data_index_list:
    
            training_data_list.append(loaded_data[training_data_index])

        # test data 구성
        for test_data_index in test_data_index_list:
    
            test_data_list.append(loaded_data[test_data_index])

        # generate training data from training_data_list using np.arrya(...)
        training_data = np.array(training_data_list)

        # generate test data from test_data_list using np.arrya(...)
        test_data = np.array(test_data_list)

        # verification shape
        print("training_data.shape = ", training_data.shape)
        print("test_data.shape = ", test_data.shape)

        # save training & test data (.csv)
        training_data_save_path = './random_' + self.name + '_training_data.csv'
        test_data_save_path = './random_' + self.name + '_test_data.csv'
        
        np.savetxt(training_data_save_path, training_data, delimiter=',')
        np.savetxt(test_data_save_path, test_data, delimiter=',')
        
        return training_data, test_data

In [25]:
from datetime import datetime      # datetime.now() 를 이용하여 학습 경과 시간 측정

class ThoracicSurgery:
    
    def __init__(self, input_nodes, hidden1_nodes, hidden2_nodes, output_nodes, learning_rate):
        
        self.input_nodes = input_nodes
        self.hidden1_nodes = hidden1_nodes
        self.hidden2_nodes = hidden2_nodes
        self.output_nodes = output_nodes
        
        # 은닉층 가중치  W2 = Xavier/He 방법으로 self.W2 가중치 초기화
        self.W2 = np.random.rand(self.input_nodes, self.hidden1_nodes) / np.sqrt(self.input_nodes/2)
        self.b2 = np.random.rand(self.hidden1_nodes)      
        
        # 출력층 가중치는 W3 =  Xavier/He 방법으로 self.W3 가중치 초기화
        self.W3 = np.random.rand(self.hidden1_nodes, self.hidden2_nodes) / np.sqrt(self.hidden1_nodes/2)
        self.b3 = np.random.rand(self.hidden2_nodes)      
        
        self.W4 = np.random.rand(self.hidden2_nodes, self.output_nodes) / np.sqrt(self.hidden2_nodes/2)
        self.b4 = np.random.rand(self.output_nodes)
        # 출력층 선형회귀 값 Z4, 출력값 A4 정의 (모두 행렬로 표시)
        self.Z4 = np.zeros([1,output_nodes])
        self.A4 = np.zeros([1,output_nodes])
        
        # 출력층 선형회귀 값 Z3, 출력값 A3 정의 (모두 행렬로 표시)
        self.Z3 = np.zeros([1,hidden2_nodes])
        self.A3 = np.zeros([1,hidden2_nodes])
        
        # 은닉층 선형회귀 값 Z2, 출력값 A2 정의 (모두 행렬로 표시)
        self.Z2 = np.zeros([1,hidden1_nodes])
        self.A2 = np.zeros([1,hidden1_nodes])
        
        # 입력층 선형회귀 값 Z1, 출력값 A1 정의 (모두 행렬로 표시)
        self.Z1 = np.zeros([1,input_nodes])    
        self.A1 = np.zeros([1,input_nodes])       
        
        # 학습률 learning rate 초기화
        self.learning_rate = learning_rate
        
    def feed_forward(self):  
        
        delta = 1e-7    # log 무한대 발산 방지
        
        # 입력층 선형회귀 값 Z1, 출력값 A1 계산
        self.Z1 = self.input_data
        self.A1 = self.input_data
        
        # 은닉층 선형회귀 값 Z2, 출력값 A2 계산    
        self.Z2 = np.dot(self.A1, self.W2) + self.b2
        self.A2 = sigmoid(self.Z2)
        
        # 출력층 선형회귀 값 Z3, 출력값 A3 계산
        self.Z3 = np.dot(self.A2, self.W3) + self.b3
        self.A3 = sigmoid(self.Z3)
        
        self.Z4 = np.dot(self.A3, self.W4) + self.b4
        self.A4 = sigmoid(self.Z4)
        
        return  -np.sum( self.target_data*np.log(self.A4 + delta) + (1-self.target_data)*np.log((1 - self.A4)+delta ) )    
    
    def loss_val(self):
        
        delta = 1e-7    # log 무한대 발산 방지
        
        # 입력층 선형회귀 값 Z1, 출력값 A1 계산
        self.Z1 = self.input_data
        self.A1 = self.input_data
        
        # 은닉층 선형회귀 값 Z2, 출력값 A2 계산    
        self.Z2 = np.dot(self.A1, self.W2) + self.b2
        self.A2 = sigmoid(self.Z2)
        
        # 출력층 선형회귀 값 Z3, 출력값 A3 계산
        self.Z3 = np.dot(self.A2, self.W3) + self.b3
        self.A3 = sigmoid(self.Z3)
        
        self.Z4 = np.dot(self.A3, self.W4) + self.b4
        self.A4 = sigmoid(self.Z4)
        
        return  -np.sum( self.target_data*np.log(self.A4 + delta) + (1-self.target_data)*np.log((1 - self.A4)+delta ) )    
    
    # accuracy method
    def accuracy(self, input_data, target_data):
        
        matched_list = []
        not_matched_list = []
        
        # list which contains (index, label, prediction) value
        index_label_prediction_list = []
        
        temp_list = []
        
        for index in range(len(input_data)):
            
            # predict 를 위해서 vector 을 matrix 로 변환하여 인수로 넘겨줌
            (real_val, predicted_num) = self.predict(np.array(input_data[index], ndmin=2)) 
            
            if predicted_num == target_data[index]:
                matched_list.append(index)
                
            else:
                not_matched_list.append(index)
                
                temp_list.append(index)
                temp_list.append(target_data[index])
                temp_list.append(predicted_num)
                
                index_label_prediction_list.append(temp_list)
                
                temp_list = []
                
        accuracy_result = len(matched_list) / len(input_data)
            
        print("Accuracy => ", accuracy_result)
            
        return matched_list, not_matched_list, index_label_prediction_list
    
    # train method
    def train(self, input_data, target_data):   
        
        self.target_data = target_data    
        self.input_data = input_data
        
        # 먼저 feed forward 를 통해서 최종 출력값과 이를 바탕으로 현재의 에러 값 계산
        loss_val = self.feed_forward()
        
        loss_4 = (self.A4-self.target_data) * self.A4 * (1-self.A4)
        
        self.W4 = self.W4 - self.learning_rate * np.dot(self.A3.T,loss_4)
        self.b3 = self.b3 - self.learning_rate * loss_4
        
        # 출력층 loss 인 loss_3 구함
        loss_3 = np.dot(loss_4,self.W4.T) * self.A3 * (1-self.A3)
                        
        # 출력층 가중치 W3, 출력층 바이어스 b3 업데이트
        self.W3 = self.W3 - self.learning_rate * np.dot(self.A2.T, loss_3)   
        
        self.b3 = self.b3 - self.learning_rate * loss_3
        
        # 은닉층 loss 인 loss_2 구함        
        loss_2 = np.dot(loss_3, self.W3.T) * self.A2 * (1-self.A2)
        
        # 은닉층 가중치 W2, 은닉층 바이어스 b2 업데이트
        self.W2 = self.W2 - self.learning_rate * np.dot(self.A1.T, loss_2)   
        
        self.b2 = self.b2 - self.learning_rate * loss_2
        
    def predict(self, input_data):        # input_data 는 행렬로 입력됨     
        
        Z2 = np.dot(input_data, self.W2) + self.b2
        A2 = sigmoid(Z2)
        
        Z3 = np.dot(A2, self.W3) + self.b3
        A3 = sigmoid(Z3)
        
        Z4 = np.dot(A3, self.W4) + self.b4
        y = A4 = sigmoid(Z4)
        
        if y >= 0.5:
            predicted_num = 1
        else:
            predicted_num = 0
    
        return y, predicted_num

### Normalize 하지 않은 데이터를 이용하여 학습 진행

In [26]:
# DataGeneration 객체 생성
seperation_rate = 0.3
data_obj = DataGeneration('ThoracicSurgery', './ThoracicSurgery.csv', seperation_rate)  # normalize 하지 않음
(training_data, test_data) = data_obj.generate()

print("training_data.shape = ", training_data.shape)
print("test_data.shape = ", test_data.shape)

#hyper-parameter
i_nodes = training_data.shape[1] - 1    # input nodes 개수
h1_nodes = 50  # hidden nodes 개수, 
h2_nodes = 20
o_nodes = 1    # output nodes 개수
lr = 1e-1      # learning rate
epochs = 200   # 반복횟수

# ThoracicSurgery 객체 생성
obj = ThoracicSurgery(i_nodes, h1_nodes, h2_nodes, o_nodes, lr)

print("Neural Network Learning using BackPropagation...")

start_time = datetime.now()

for step in range(epochs):
    
    for index in range(len(training_data)):
        
        input_data = training_data[index, 0:-1]
        target_data = training_data[index, [-1]]
        
        obj.train( np.array(input_data, ndmin=2), np.array([target_data], ndmin=2) )  # 행렬로 입력
        
    if step % 10 == 0:
        print("epochs = ", step, "loss value = ", obj.loss_val())

end_time = datetime.now()
        
print("")
print("Elapsed Time => ", end_time - start_time)

DataGeneration object is created !!
[debug. before data normalization] loaded_data[0] =  [293.    1.    3.8   2.8   0.    0.    0.    0.    0.    0.   12.    0.
   0.    0.    1.    0.   62.    0. ]
loaded_data.shape =  (470, 18)
total_data_num =  470 , test_data_num =  141
length of test_data_index_list =  141
length of training_data_index_list =  329
training_data.shape =  (329, 18)
test_data.shape =  (141, 18)
training_data.shape =  (329, 18)
test_data.shape =  (141, 18)
Neural Network Learning using BackPropagation...
epochs =  0 loss value =  1.709315252040008
epochs =  10 loss value =  1.7095215908507597
epochs =  20 loss value =  1.7094888424242283
epochs =  30 loss value =  1.7094567670484475
epochs =  40 loss value =  1.70942534208991
epochs =  50 loss value =  1.7093945459928301
epochs =  60 loss value =  1.7093643582105211
epochs =  70 loss value =  1.7093347591428494
epochs =  80 loss value =  1.7093057300789063
epochs =  90 loss value =  1.709277253144237
epochs =  100 los

In [14]:
test_input_data = test_data[ :, 0:-1 ]
test_target_data = test_data[ :, -1 ]

(true_list, false_list, index_label_prediction_list) = obj.accuracy(test_input_data, test_target_data) 

Accuracy =>  0.9078014184397163


In [7]:
print(index_label_prediction_list)

[[4, 1.0, 0], [10, 1.0, 0], [15, 1.0, 0], [20, 1.0, 0], [21, 1.0, 0], [31, 1.0, 0], [34, 1.0, 0], [37, 1.0, 0], [39, 1.0, 0], [46, 1.0, 0], [48, 1.0, 0], [49, 1.0, 0], [57, 1.0, 0], [58, 1.0, 0], [66, 1.0, 0], [79, 1.0, 0], [90, 1.0, 0], [92, 1.0, 0], [100, 1.0, 0], [101, 1.0, 0], [105, 1.0, 0], [113, 1.0, 0], [119, 1.0, 0], [121, 1.0, 0], [125, 1.0, 0], [128, 1.0, 0], [133, 1.0, 0], [140, 1.0, 0]]


### Normalize 데이터를 이용하여 학습 진행

In [9]:
# DataGeneration 객체 생성
seperation_rate = 0.3
data_obj = DataGeneration('ThoracicSurgery', './ThoracicSurgery.csv', seperation_rate, True)  # normalize 
(training_data, test_data) = data_obj.generate()

print("training_data.shape = ", training_data.shape)
print("test_data.shape = ", test_data.shape)

#hyper-parameter
i_nodes = training_data.shape[1] - 1    # input nodes 개수
h1_nodes = 50  # hidden nodes 개수, 
h2_nodes = 20
o_nodes = 1    # output nodes 개수
lr = 1e-3      # learning rate
epochs = 200   # 반복횟수

# ThoracicSurgery 객체 생성
obj = ThoracicSurgery(i_nodes, h1_nodes, h2_nodes, o_nodes, lr)

print("Neural Network Learning using BackPropagation...")

start_time = datetime.now()

for step in range(epochs):
    
    for index in range(len(training_data)):
        
        input_data = training_data[index, 0:-1]
        target_data = training_data[index, [-1]]
        
        obj.train( np.array(input_data, ndmin=2), np.array([target_data], ndmin=2) )  # 행렬로 입력
        
    if step % 10 == 0:
        print("epochs = ", step, "loss value = ", obj.loss_val())

end_time = datetime.now()
        
print("")
print("Elapsed Time => ", end_time - start_time)

DataGeneration object is created !!
[debug. before data normalization] loaded_data[0] =  [293.    1.    3.8   2.8   0.    0.    0.    0.    0.    0.   12.    0.
   0.    0.    1.    0.   62.    0. ]
loaded_data.shape =  (470, 18)
transpose_loaded_data.shape =  (18, 470)
(18, 470)
(470, 18)
[debug. after data normalization] loaded_data[0] =  [0.62340426 0.125      0.60317457 0.03244496 0.         0.
 0.         0.         0.         0.         0.85714287 0.
 0.         0.         1.         0.         0.7126437  0.        ]
total_data_num =  470 , test_data_num =  141
length of test_data_index_list =  141
length of training_data_index_list =  329
training_data.shape =  (329, 18)
test_data.shape =  (141, 18)
training_data.shape =  (329, 18)
test_data.shape =  (141, 18)
Neural Network Learning using BackPropagation...
epochs =  0 loss value =  3.495110858345409
epochs =  10 loss value =  0.42124051527983275
epochs =  20 loss value =  0.18947318931495558
epochs =  30 loss value =  0.163922

In [10]:
test_input_data = test_data[ :, 0:-1 ]
test_target_data = test_data[ :, -1 ]

(true_list, false_list, index_label_prediction_list) = obj.accuracy(test_input_data, test_target_data) 

Accuracy =>  0.8297872340425532


In [11]:
print(index_label_prediction_list)

[[4, 1.0, 0], [5, 1.0, 0], [9, 1.0, 0], [26, 1.0, 0], [37, 1.0, 0], [43, 1.0, 0], [47, 1.0, 0], [52, 1.0, 0], [54, 1.0, 0], [64, 1.0, 0], [65, 1.0, 0], [70, 1.0, 0], [73, 1.0, 0], [76, 1.0, 0], [78, 1.0, 0], [86, 1.0, 0], [91, 1.0, 0], [97, 1.0, 0], [106, 1.0, 0], [110, 1.0, 0], [111, 1.0, 0], [120, 1.0, 0], [121, 1.0, 0], [132, 1.0, 0]]
