In [1]:
import os, glob, re, time, csv
import numpy as np

In [2]:
np.random.seed(1234)
def randomize(): np.random.seed(time.time())

In [12]:
RND_MEAN = 0
RND_STD = 0.0030

LEARNING_RATE = 0.001
def main(epoch_count = 10, mb_size= 10, report=1):
    load_dataset()
    init_model()
    trainer(epoch_count, mb_size, report)
    
def load_dataset():
    with open('./faults.csv') as file:
        csvreader = csv.reader(file)
        next(csvreader, None)
        rows=[]
        for row in csvreader:
            rows.append(row)
    global data, input_cnt, output_cnt
    input_cnt, output_cnt = 27, 7
    data = np.asarray(rows, dtype='float32')
    
def init_model():
    global weight, bias, input_cnt, output_cnt
    weight = np.random.normal(RND_MEAN, RND_STD, [input_cnt, output_cnt])
    bias = np.zeros([output_cnt])
    
def trainer(epoch_count, mb_size, report):
    step_count = arrange_data(mb_size)
    test_x, test_y = get_test_data()
    
    for epoch in range(epoch_count):
        losses, accs = [],[]
        
        for n in range(step_count):
            train_x, train_y = get_train_data(mb_size, n)
            loss, acc = run_train(train_x, train_y)
            losses.append(loss)
            accs.append(acc)
        # 보고서 작성 주기 결정 report =1 은 매epoch 마다 출력하게 됨    
        if report >0 and (epoch+1) %report ==0:
            acc = run_test(test_x, test_y)
            print("Epoch {} : loss={:5.3f}, accuracy={:5.3f}/{:5.3f}".format(epoch+1, np.mean(losses), np.mean(accs), acc))
    final_acc = run_test(test_x, test_y)
    print('\nFinal Test: final accuracy = {:5.3f}'.format(final_acc))

In [4]:
## dataloader 부분에 해당 
def arrange_data(mb_size):
    # 데이터 정렬
    global data, shuffle_map, test_begin_idx
    shuffle_map = np.arange(data.shape[0])
    np.random.shuffle(shuffle_map)
    step_count = int(data.shape[0]*.8)// mb_size
    test_begin_idx = step_count * mb_size
    return step_count

def get_test_data():
    # 테스트 데이버 확보
    global data, shuffle_map, test_begin_idx, output_cnt
    test_data = data[shuffle_map[test_begin_idx:]]
    return test_data[:,:-output_cnt], test_data[:, -output_cnt:]

def get_train_data(mb_size, nth):
    # 트레인데이터 확보
    global data, shuffle_map, test_begin_idx, output_cnt
    if nth == 0:
        np.random.shuffle(shuffle_map[:test_begin_idx])
    train_data = data[shuffle_map[mb_size*nth:mb_size*(nth+1)]]
    return train_data[:, :-output_cnt], train_data[:, -output_cnt:]

In [16]:
def run_train(x, y):
    # 순전파
    output, aux_nn = forward_neuralnet(x)
    # 순전파 후처리 연산과정
    loss, aux_pp = forward_postproc(output, y)
    # 정확도 판별
#     accuracy = eval_accuracy(output, y)
    # f1, 정확도, 정밀도, 재현율 고려한 평가함수로 변경
    accuracy = eval_accuracy(output, y)
    G_loss = 1.0
    # 역전파 후처리 연산과정
    G_output=backprop_postproc(G_loss, aux_pp)
    # 역전파
    backprop_neuralnet(G_output, aux_nn)
    
    return loss, accuracy

def run_test(x, y):
    output, _ = forward_neuralnet(x)
    # f1, 정확도, 정밀도, 재현율 고려한 평가함수로 변경
    accuracy = eval_accuracy(output, y)
#    accuracy = eval_accuracy(output, y)
    return accuracy

In [7]:
def forward_neuralnet(x):
    global weight, bias
    output = np.matmul(x, weight) + bias
    return output, x

def backprop_neuralnet(G_output, x):
    global weight, bias
    g_output_w = x.transpose()
    
    G_w = np.matmul(g_output_w, G_output)
    G_b = np.sum(G_output, axis=0)
    weight -=LEARNING_RATE *G_w
    bias -= LEARNING_RATE * G_b

In [8]:
def forward_postproc(output, y):
    entropy = softmax_cross_entropy_with_logits(y, output)
    loss = np.mean(entropy) 
    return loss, [y, output, entropy]

def backprop_postproc(G_loss, aux):
    y, output, entropy = aux
    
    g_loss_entropy = 1.0 / np.prod(entropy.shape)
    g_entropy_output = softmax_cross_entropy_with_logits_derv(y, output)
    
    G_entropy = g_loss_entropy * G_loss
    G_output = g_entropy_output * G_entropy
    
    return G_output

In [9]:
def eval_accuracy(output, y):
    estimate = np.argmax(output, axis=1)
    answer = np.argmax(y, axis=1)
    correct = np.equal(estimate, answer)
    
    return np.mean(correct)

In [10]:
def softmax(x):
    max_elem = np.max(x, axis=1)
    diff = (x.transpose() - max_elem).transpose()
    exp = np.exp(diff)
    sum_exp = np.sum(exp, axis=1)
    probs = (exp.transpose() / sum_exp).transpose()
    return probs

def softmax_derv(x, y):
    mb_size, nom_size = x.shape
    derv = np.ndarray([mb_size, nom_size, nom_size])
    for n in range(mb_size):
        for i in range(nom_size):
            for j in range(nom_size):
                derv[n, i, j] = -y[n,i] * y[n,j]
            derv[n, i, i] += y[n,i]
    return derv

def softmax_cross_entropy_with_logits(labels, logits):
    probs = softmax(logits)
    return -np.sum(labels * np.log(probs+1.0e-10), axis=1)

def softmax_cross_entropy_with_logits_derv(labels, logits):
    return softmax(logits) - labels

In [18]:
main()

Epoch 1 : loss=16.683, accuracy=0.275/0.189
Epoch 2 : loss=15.390, accuracy=0.332/0.286
Epoch 3 : loss=15.806, accuracy=0.314/0.430
Epoch 4 : loss=15.732, accuracy=0.317/0.389
Epoch 5 : loss=15.331, accuracy=0.334/0.189
Epoch 6 : loss=16.103, accuracy=0.301/0.327
Epoch 7 : loss=15.123, accuracy=0.343/0.205
Epoch 8 : loss=15.375, accuracy=0.332/0.419
Epoch 9 : loss=15.717, accuracy=0.317/0.371
Epoch 10 : loss=15.494, accuracy=0.327/0.442

Final Test: final accuracy = 0.442
