#### DataGeneration ver1 이용한 training data / test data 생성 후,
#### 생성된 training data / test data 에 포함된 target distribution 확인 

In [1]:
import numpy as np
import random
from datetime import datetime

# 수치미분 함수

def numerical_derivative(f, x):
    delta_x = 1e-4 # 0.0001
    grad = np.zeros_like(x)
    
    it = np.nditer(x, flags=['multi_index'], op_flags=['readwrite'])
    
    while not it.finished:
        idx = it.multi_index        
        tmp_val = x[idx]
        x[idx] = float(tmp_val) + delta_x
        fx1 = f(x) # f(x+delta_x)
        
        x[idx] = tmp_val - delta_x 
        fx2 = f(x) # f(x-delta_x)
        grad[idx] = (fx1 - fx2) / (2*delta_x)
        
        x[idx] = tmp_val 
        it.iternext()   
        
    return grad

# sigmoid 함수

def sigmoid(x):
    return 1 / (1+np.exp(-x))

#### Diabetes 클래스 정의

In [2]:
# Diabetes Class

class Diabetes:
    
    # 생성자
    # xdata, tdata => numpy.array(...)
    def __init__(self, input_nodes, hidden_nodes, output_nodes, learning_rate):
        
        # 2층 hidden layer unit 
        # 가중치 W, 바이어스 b 초기화
        self.W2 = np.random.rand(input_nodes, hidden_nodes)  
        self.b2 = np.random.rand(hidden_nodes)
        
        # 3층 output layer unit : 1 개 
        self.W3 = np.random.rand(hidden_nodes,output_nodes)
        self.b3 = np.random.rand(output_nodes)
                        
        # 학습률 learning rate 초기화
        self.learning_rate = learning_rate
        
        print("Diabetes object is created !!!")
        
    # 손실함수
    def feed_forward(self):
        
        delta = 1e-7    # log 무한대 발산 방지
    
        z2 = np.dot(self.input_data, self.W2) + self.b2
        a2 = sigmoid(z2)
        
        z3 = np.dot(a2, self.W3) + self.b3
        y = a3 = sigmoid(z3)
    
        # cross-entropy 
        return  -np.sum( self.target_data*np.log(y + delta) + (1-self.target_data)*np.log((1 - y)+delta ) )
    
    # obtain W and b
    def get_W_b(self):
        
        return self.W2,  self.b2, self.W3, self.b3
    
    # 손실 값 계산
    def loss_val(self):
        
        delta = 1e-7    # log 무한대 발산 방지
    
        z2 = np.dot(self.input_data, self.W2) + self.b2
        a2 = sigmoid(z2)
        
        z3 = np.dot(a2, self.W3) + self.b3
        y = a3 = sigmoid(z3)
    
        # cross-entropy 
        return  -np.sum( self.target_data*np.log(y + delta) + (1-self.target_data)*np.log((1 - y)+delta ) )
    
    # query, 즉 미래 값 예측 함수
    def predict(self, input_data):    
        
        z2 = np.dot(input_data, self.W2) + self.b2
        a2 = sigmoid(z2)
        
        z3 = np.dot(a2, self.W3) + self.b3
        y = a3 = sigmoid(z3)
    
        if y >= 0.5:
            result = 1  # True
        else:
            result = 0  # False
    
        return y, result

    
    def accuracy(self, input_data, target_data):
        
        matched_list = []
        not_matched_list = []
        
        # list which contains (index, label, prediction) value
        index_label_prediction_list = []
        
        # temp list which contains label and prediction in sequence
        temp_list = []
        
        for index in range(len(input_data)):
            
            (real_val, logical_val) = self.predict(input_data[index])
            
            if logical_val == target_data[index]:
                matched_list.append(index)
            else:
                not_matched_list.append(index)
                
                temp_list.append(index)
                temp_list.append(target_data[index])
                temp_list.append(logical_val)
                
                index_label_prediction_list.append(temp_list)
                
                temp_list = []
                
                
        accuracy_result = len(matched_list) / len(input_data)
        
        print("Accuracy => ", accuracy_result)
        
        return matched_list, not_matched_list, index_label_prediction_list
    
        
    # 수치미분을 이용하여 손실함수가 최소가 될때 까지 학습하는 함수
    def train(self, input_data, target_data):
        
        self.input_data = input_data
        self.target_data = target_data
        
        f = lambda x : self.feed_forward()
        
        self.W2 -= self.learning_rate * numerical_derivative(f, self.W2)
    
        self.b2 -= self.learning_rate * numerical_derivative(f, self.b2)
        
        self.W3 -= self.learning_rate * numerical_derivative(f, self.W3)
    
        self.b3 -= self.learning_rate * numerical_derivative(f, self.b3)

#### DataGeneration class 이용하여 training_data,  test_data 분리

In [3]:
class DataGeneration:
    
    def __init__(self, name, file_path, seperation_rate):
        
        self.name = name
        
        self.file_path = file_path
        
        self.seperation_rate = seperation_rate
    
    # shuffle 기능을 이용하여 training_data / test_data 생성
    def generate(self):
    
        # 데이터 불러오기, 파일이 없는 경우 exception 발생

        try:
            loaded_data = np.loadtxt(self.file_path, delimiter=',', dtype=np.float32)
            
        except Exception as err:
            print('[DataGeneration::generate()]  ', str(err))
            raise Exception(str(err))

        print("[DataGeneration]  loaded_data.shape = ", loaded_data.shape)
        
        # 임시 저장 리스트
        training_data_list = []
        test_data_list = []

        # 분리비율에 맞게 테스트데이터로 분리
        total_data_num = len(loaded_data)
        test_data_num = int(len(loaded_data) * self.seperation_rate)

        print("[DataGeneration]  total_data_num = ", total_data_num, ", test_data_num = ", test_data_num)

        # 전체 데이터 인덱스를 가지고 있는 리스트 생성
        total_data_index_list = [ index for index in range(total_data_num) ]

        # random.shuffle 을 이용하여 인덱스 리스트 생성
        random.shuffle(total_data_index_list)  # 전체 인덱스가 랜덤하게 섞여진 리스트로 변형된다

        # test data 를 위한 인덱스는 total_data_index_list 로뷰터 앞에서 분리비율(seperation_rate)의 데이터 인덱스
        test_data_index_list = total_data_index_list[ 0:test_data_num ]

        print("[DataGeneration]  length of test_data_index_list = ", len(test_data_index_list))

        # training data 를 위한 인덱스는 total_data_index_list 에서 test data 인덱스를 제외한 나머지 부분
        training_data_index_list = total_data_index_list[ test_data_num: ]

        print("[DataGeneration]  length of training_data_index_list = ", len(training_data_index_list))

        # training data 구성
        for training_data_index in training_data_index_list:
    
            training_data_list.append(loaded_data[training_data_index])

        # test data 구성
        for test_data_index in test_data_index_list:
    
            test_data_list.append(loaded_data[test_data_index])

        # generate training data from training_data_list using np.arrya(...)
        training_data = np.array(training_data_list)

        # generate test data from test_data_list using np.arrya(...)
        test_data = np.array(test_data_list)

        # verification shape
        print("[DataGeneration]  training_data.shape = ", training_data.shape)
        print("[DataGeneration]  test_data.shape = ", test_data.shape)

        # save training & test data (.csv)
        training_data_save_path = './' + self.name + '_training_data.csv'
        test_data_save_path = './' + self.name + '_test_data.csv'
        
        # 저장공간이 없거나 파일 write 실패시 exception 발생
        try:
            np.savetxt(training_data_save_path, training_data, delimiter=',')
            np.savetxt(test_data_save_path, test_data, delimiter=',')
            
        except Exception as err:
            print('[DataGeneration::generate()]  ', str(err))
            raise Exception(str(err))
        
        return training_data, test_data

#### DataGeneration 객체 생성.
#### 생성된 training data / test data 에 대한 정답(target) 분포 확인 기능 추가 
#### DataGeneration Class 내부에 구현하지 않고, 아래처럼 별도 구현한 이유는,
#### 각 데이터 정답이 어떤 경우는 0 열에 있고 어떤 경우에는 마지막열에 있기 때문에
#### DadaGeneration Class 내부에서 일반화 시키기 어려움

In [4]:
# DataGeneration 객체 생성.
# 생성된 training data / test data 에 대한 정답(target) 분포 확인

seperation_rate = 0.3 # 분리비율

try:
    # 원본 데이터의 정답 분포 확인
    loaded_data = np.loadtxt('./(191109)diabetes.csv', delimiter=',', dtype=np.float32)
    
    loaded_target_data = loaded_data[ :, -1 ]
    
    print('====================================================================')
    # numpy.unique() 사용하여 loaded data target 분포 확인
    unique, counts = np.unique(loaded_target_data, return_counts=True)

    print('kinds of original target value = ', dict(zip(unique, counts)).items())

    num_zeros = dict(zip(unique, counts))[0.0]  # key 0.0 에 대한 value 값 count 리턴
    num_ones = dict(zip(unique, counts))[1.0]  # key 1.0 에 대한 value 값 count 리턴

    print('number of zeros (original) = ', num_zeros, ', ratio = ', 100 * num_zeros / (loaded_data.shape[0]), ' %')
    print('number of ones (original) = ', num_ones, ', ratio = ', 100 * num_ones / (loaded_data.shape[0]), '%') 
    
    print('====================================================================')
    
    # DataGeneration 이용한 training data / test data 분리
    
    data_obj = DataGeneration('Diabetes', './(191109)diabetes.csv', seperation_rate)

    (training_data, test_data) = data_obj.generate()
    
    
    # 정답(target) 분포 확인 (정답이 마지막 열에 있는 경우)
    # 이처럼 
    training_target_data = training_data[ :, -1 ]
    test_target_data = test_data[ :, -1 ]


    print('====================================================================')
    # numpy.unique() 사용하여 training target 분포 확인
    unique, counts = np.unique(training_target_data, return_counts=True)

    print('kinds of training target value = ', dict(zip(unique, counts)).items())

    num_zeros = dict(zip(unique, counts))[0.0]  # key 0.0 에 대한 value 값 count 리턴
    num_ones = dict(zip(unique, counts))[1.0]  # key 1.0 에 대한 value 값 count 리턴

    print('number of zeros (training) = ', num_zeros, ', ratio = ', 100 * num_zeros / (training_data.shape[0]), ' %')
    print('number of ones (training) = ', num_ones, ', ratio = ', 100 * num_ones / (training_data.shape[0]), '%') 

    print('====================================================================')
    # numpy.unique() 사용하여 test target 분포 확인
    unique, counts = np.unique(test_target_data, return_counts=True)

    print('kinds of test target value = ', dict(zip(unique, counts)).items())

    num_zeros = dict(zip(unique, counts))[0.0]  # key 0.0 에 대한 value 값 count 리턴
    num_ones = dict(zip(unique, counts))[1.0]  # key 1.0 에 대한 value 값 count 리턴

    print('number of zeros (test) = ', num_zeros, ', ratio = ', 100 * num_zeros / (test_data.shape[0]), ' %')
    print('number of ones (test) = ', num_ones, ', ratio = ', 100 * num_ones / (test_data.shape[0]), '%') 
    print('====================================================================')
    
    
except Exception as err:
    
    print('Exception Occur !!')
    print(str(err))

[DataGeneration]  loaded_data.shape =  (759, 9)
[DataGeneration]  total_data_num =  759 , test_data_num =  227
[DataGeneration]  length of test_data_index_list =  227
[DataGeneration]  length of training_data_index_list =  532
[DataGeneration]  training_data.shape =  (532, 9)
[DataGeneration]  test_data.shape =  (227, 9)
kinds of training target value =  dict_items([(0.0, 187), (1.0, 345)])
number of zeros (training) =  187 , ratio =  35.150375939849624  %
number of ones (training) =  345 , ratio =  64.84962406015038 %
kinds of test target value =  dict_items([(0.0, 76), (1.0, 151)])
number of zeros (test) =  76 , ratio =  33.480176211453745  %
number of ones (test) =  151 , ratio =  66.51982378854626 %


#### Hyper-Parameter 설정 및 train 실행

In [5]:
#hyper-parameter
i_nodes = training_data.shape[1] - 1    # input nodes 개수
h1_nodes = 20  # hidden nodes 개수
o_nodes = 1    # output nodes 개수
lr = 1e-2      # learning rate
epochs = 10   # 반복횟수

# Diabetes 객체 생성
obj = Diabetes(i_nodes, h1_nodes, o_nodes, lr)

print("Neural Network Learning using Numerical Derivative...")

start_time = datetime.now()

for step in range(epochs):
    
    for index in range(len(training_data)):
        
        input_data = training_data[index, 0:-1]
        target_data = training_data[index, [-1]]
        
        obj.train(input_data, target_data)
        
    if (step % 2 == 0):
        print("epochs = ", step, "loss value = ", obj.loss_val())

end_time = datetime.now()
        
print("")
print("Elapsed Time => ", end_time - start_time)

Diabetes object is created !!!
Neural Network Learning using Numerical Derivative...
epochs =  0 loss value =  0.5498720599907942
epochs =  2 loss value =  0.48063496584337145
epochs =  4 loss value =  0.41772353169775667
epochs =  6 loss value =  0.37407286060362466
epochs =  8 loss value =  0.3430429045829218

Elapsed Time =>  0:02:19.068295


#### Accuracy 검증

In [6]:
test_input_data = test_data[ :, 0:-1 ]
test_target_data = test_data[ :, -1 ]

(true_list, false_list, index_label_prediction_list) = obj.accuracy(test_input_data, test_target_data)

Accuracy =>  0.7180616740088106


In [7]:
print(index_label_prediction_list)

[[1, 0.0, 1], [3, 0.0, 1], [4, 0.0, 1], [5, 0.0, 1], [7, 0.0, 1], [11, 1.0, 0], [17, 0.0, 1], [19, 0.0, 1], [20, 0.0, 1], [28, 0.0, 1], [33, 0.0, 1], [34, 0.0, 1], [37, 0.0, 1], [38, 0.0, 1], [40, 0.0, 1], [42, 1.0, 0], [47, 0.0, 1], [51, 1.0, 0], [56, 1.0, 0], [63, 0.0, 1], [66, 0.0, 1], [69, 0.0, 1], [71, 0.0, 1], [83, 0.0, 1], [88, 0.0, 1], [91, 1.0, 0], [92, 0.0, 1], [95, 0.0, 1], [98, 0.0, 1], [99, 0.0, 1], [103, 0.0, 1], [106, 1.0, 0], [107, 0.0, 1], [108, 0.0, 1], [112, 1.0, 0], [115, 1.0, 0], [116, 0.0, 1], [118, 0.0, 1], [119, 0.0, 1], [124, 0.0, 1], [130, 0.0, 1], [134, 0.0, 1], [136, 1.0, 0], [137, 1.0, 0], [139, 0.0, 1], [140, 0.0, 1], [142, 0.0, 1], [152, 0.0, 1], [155, 0.0, 1], [163, 0.0, 1], [164, 0.0, 1], [165, 0.0, 1], [168, 0.0, 1], [169, 1.0, 0], [172, 0.0, 1], [175, 0.0, 1], [177, 0.0, 1], [187, 1.0, 0], [194, 0.0, 1], [204, 1.0, 0], [206, 0.0, 1], [221, 0.0, 1], [223, 0.0, 1], [226, 0.0, 1]]
