In [1]:
import numpy as np
import csv
import time

np.random.seed(1234)

def randomize():
        np.random.seed(time.time())

In [2]:
class Model(object):
    def __init__(self, name, dataset):
        self.name = name
        self.dataset = dataset
        self.is_training = False
        
        if not hasattr(self, 'rand_std'):
            self.rand_std = 0.030
        
    def __str__(self):
        return '{}/{}'.format(self.name, self.dataset)
    
    def exec_all(self, epoch_count = 10, batch_size = 10, learning_rate = 0.001,
                report = 0, show_cnt = 3):
        self.train(epoch_count, batch_size, learning_rate, report)
        self.test()
        if show_cnt > 0:
            self.visualize(show_cnt)

### __init__

is_training flag is only turned on during training, and will be turned off during validation or evaluation process. It is a processing technique that behaves differently when it is learning and when it is not

### exec_all

exec_all serves as the main function that runs the entire process. It calls the training, evaluation, and visualization methods in that order.

exec_All method also passes several hyperparameters specified as parameters to each required method.

In [3]:
class MlpModel(Model):
    def __init__(self, name, dataset, hconfigs):
        super(MlpModel, self).__init__(name, dataset)
        self.init_parameters(hconfigs)

Call the init_parameter method to prepare parameters for the neural network to use. The hidden layer configuration of the multilayer perceptron is determined by the hconfigs argument value passed to the init_parameters() method.

In [4]:
def mlp_init_parameters(self, hconfigs):
    self.hconfigs = hconfigs
    self.pm_hiddens = []
    
    prev_shape = self.dataset.input_shape
    
    for hconfig in hconfigs:
        pm_hidden, prev_shape = self.alloc_layer_param(prev_shape, hconfig)
        self.pm_hiddens.append(pm_hidden)
    
    output_cnt = int(np.prod(self.dataset.output_shape))
    self.pm_output, _ = self.alloc_layer_param(prev_shape, output_cnt)
    
def mlp_alloc_layer_param(self, input_shape, hconfig):
    input_cnt = np.prod(input_shape)
    output_cnt = hconfig
    
    weight, bias = self.alloc_param_pair([input_cnt, output_cnt])
    
    return {'w' : weight, 'b' : bias}, output_cnt

def mlp_alloc_param_pair(self, shape):
    weight = np.random.normal(0, self.rand_std, shape)
    bias = np.zeros([shape[-1]])
    return weight, bias

MlpModel.init_parameters = mlp_init_parameters
MlpModel.alloc_layer_param = mlp_alloc_layer_param
MlpModel.alloc_param_pair = mlp_alloc_param_pair

Information such as input/output vector size is obtained not as a global variable, but as a property value of the dataset object.

Save created parameters as object variables instead of gloabl variables.

In [60]:
def mlp_model_train(self, epoch_count = 10, batch_size = 10, \
                   learning_rate = 0.001, report = 0):
    self.learning_rate = learning_rate
    
    batch_count = int(self.dataset.train_count() / batch_size)
    time1 = time2 = int(time.time())
    
    if report != 0:
        print('Model {} train started:'.format(self.name))
    
    for epoch in range(epoch_count):
        costs = []
        accs = []
        self.dataset.shuffle_train_data(batch_size * batch_count)
        
        for n in range(batch_count):
            trX, trY = self.dataset.get_train_data(batch_size, n)
            cost, acc = self.train_step(trX, trY)
            costs.append(cost)
            accs.append(acc)
            
        if report > 0 and (epoch + 1) % report == 0:
            vaX, vaY = self.dataset.get_validate_data(100)
            acc = self.eval_accuracy(vaX, vaY)
            time3 = int(time.time())
            tm1, tm2 = time3 - time2, time3 - time1
            
            self.dataset.train_prt_result(epoch + 1, costs, accs, acc, tm1, tm2)
            time2 = time3
    
    tm_total = int(time.time()) - time1
    print('Model {} train ended in {} secs'.format(self.name, tm_total))
    
MlpModel.train = mlp_model_train

In [6]:
def mlp_model_test(self):
    teX, teY = self.dataset.get_test_data()
    time1 = int(time.time())
    acc = self.eval_accuracy(teX, teY)
    time2 = int(time.time())
    
    self.dataset.test_prt_result(self.name, acc, time2 - time1)
    
MlpModel.test = mlp_model_test

In [7]:
def mlp_model_visualize(self, num):
    print('Model {} visualization'.format(self.name))
    deX, deY = self.dataset.get_visualize_data(num)
    est = self.get_estimate(deX)
    self.dataset.visualize(deX, est, deY)
    
MlpModel.visualize = mlp_model_visualize

In [8]:
def mlp_train_step(self, x, y):
    self.is_training = True
    
    output, aux_nn = self.forward_neuralnet(x)
    loss, aux_pp = self.forward_postproc(output, y)
    accuracy = self.eval_accuracy(x, y, output)
    
    G_loss = 1.0
    G_output = self.backprop_postproc(G_loss, aux_pp)
    self.backprop_neuralnet(G_output, aux_nn)
    
    self.is_training = False
    
    return loss, accuracy

MlpModel.train_step = mlp_train_step

In [61]:
def mlp_forward_neuralnet(self, x):
    hidden = x
    aux_layers = []
    
    for n, hconfig in enumerate(self.hconfigs):
        hidden, aux = self.forward_layer(hidden, hconfig, self.pm_hiddens[n])
        aux_layers.append(aux)
    
    output, aux_out = self.forward_layer(hidden, None, self.pm_output)
    
    return output, [aux_out, aux_layers]

def mlp_backprop_neuralnet(self, G_output, aux):
    aux_out, aux_layers = aux
    
    G_hidden = self.backprop_layer(G_output, None, self.pm_output, aux_out)
    
    for n in reversed(range(len(self.hconfigs))):
        hconfig, pm, aux = self.hconfigs[n], self.pm_hiddens[n], aux_layers[n]
        G_hidden = self.backprop_layer(G_hidden, hconfig, pm, aux)
    
    return G_hidden

MlpModel.forward_neuralnet = mlp_forward_neuralnet
MlpModel.backprop_neuralnet = mlp_backprop_neuralnet

In [57]:
def mlp_forward_layer(self, x, hconfig, pm):
    y = np.matmul(x, pm['w']) + pm['b']
    if hconfig is not None:
        y = relu(y)
    
    return y, [x, y]

def mlp_backprop_layer(self, G_y, hconfig, pm, aux):
    x, y = aux
    
    if hconfig is not None:
        G_y = relu_derv(y) * G_y
    
    g_y_weight = x.transpose()
    g_y_input = pm['w'].transpose()
   
    G_weight = np.matmul(g_y_weight, G_y)
    G_bias = np.sum(G_y, axis = 0)
    G_input = np.matmul(G_y, g_y_input)
    
    pm['w'] -= self.learning_rate * G_weight
    pm['b'] -= self.learning_rate * G_bias
    
    return G_input

MlpModel.forward_layer = mlp_forward_layer
MlpModel.backprop_layer = mlp_backprop_layer

In [11]:
def mlp_forward_postproc(self, output, y):
    loss, aux_loss = self.dataset.forward_postproc(output, y)
    extra, aux_extra = self.forward_extra_cost(y)
    return loss + extra, [aux_loss, aux_extra]

def mlp_forward_extra_cost(self, y):
    return 0, None

MlpModel.forward_postproc = mlp_forward_postproc
MlpModel.forward_extra_cost = mlp_forward_extra_cost

In [12]:
def mlp_backprop_postproc(self, G_loss, aux):
    aux_loss, aux_extra = aux
    self.backprop_extra_cost(G_loss, aux_extra)
    
    G_output = self.dataset.backprop_postproc(G_loss, aux_loss)
    
    return G_output

def mlp_backprop_extra_cost(self, G_loss, aux):
    pass

MlpModel.backprop_postproc = mlp_backprop_postproc
MlpModel.backprop_extra_cost = mlp_backprop_extra_cost

In [13]:
def mlp_eval_accuracy(self, x, y, output = None):
    if output is None:
        output, _ = self.forward_neuralnet(x)
    
    accuracy = self.dataset.eval_accuracy(x, y, output)
    return accuracy

MlpModel.eval_accuracy = mlp_eval_accuracy

In [14]:
def mlp_get_estimate(self, x):
    output, _ = self.forward_neuralnet(x)
    estimate = self.dataset.get_estimate(output)
    return estimate

MlpModel.get_estimate = mlp_get_estimate

In [15]:
class Dataset(object):
    def __init__(self, name, mode):
        self.name = name
        self.mode = mode
    
    def __str__(self):
        return '{}({}, {}+{}+{})'.format(self.name, self.mode, len(self.tr_xs), len(self.te_xs), len(self.va_xs))
    
    def train_count(self):
        return len(self.tr_xs)

In [16]:
def dataset_get_train_data(self, batch_size, nth):
    from_idx = nth * batch_size
    to_idx = (nth + 1) * batch_size
    
    tr_X = self.tr_xs[self.indices[from_idx : to_idx]]
    tr_Y = self.tr_ys[self.indices[from_idx : to_idx]]
    
    return tr_X, tr_Y

def dataset_shuffle_train_data(self, size):
    self.indices = np.arange(size)
    np.random.shuffle(self.indices)
    

Dataset.get_train_data = dataset_get_train_data
Dataset.shuffle_train_data = dataset_shuffle_train_data

In [17]:
def dataset_get_test_data(self):
    return self.te_xs, self.te_ys

Dataset.get_test_data = dataset_get_test_data

In [18]:
def dataset_get_validate_data(self, count):
    self.va_indices = np.arange(len(self.va_xs))
    np.random.shuffle(self.va_indices)
    
    va_X = self.va_xs[self.va_indices[0 : count]]
    va_Y = self.va_ys[self.va_indices[0 : count]]
    
    return va_X, va_Y

Dataset.get_validate_data = dataset_get_validate_data
Dataset.get_visualize_data = dataset_get_validate_data

In [19]:
def dataset_shuffle_data(self, xs, ys, tr_ratio = 0.8, va_ratio = 0.8):
    data_count = len(xs)
    
    tr_cnt = int(data_count * tr_ratio / 10) * 10
    va_cnt = int(data_count * va_ratio)
    te_cnt = data_count - (tr_cnt + va_cnt)
    
    tr_from, tr_to = 0, tr_cnt
    va_from, va_to = tr_cnt, tr_cnt + va_cnt
    te_from, te_to = tr_cnt + va_cnt, data_count
    
    indices = np.arange(data_count)
    np.random.shuffle(indices)
    
    self.tr_xs = xs[indices[tr_from : tr_to]]
    self.tr_ys = ys[indices[tr_from : tr_to]]
    self.va_xs = xs[indices[va_from : va_to]]
    self.va_ys = ys[indices[va_from : va_to]]
    self.te_xs = xs[indices[te_from : te_to]]
    self.te_ys = ys[indices[te_from : te_to]]
    
    self.input_shape = xs[0].shape
    self.output_shape = ys[0].shape
    
    return indices[tr_from : tr_to], indices[va_from : va_to], indices[te_from : te_to]

Dataset.shuffle_data = dataset_shuffle_data

In [20]:
def dataset_forward_postproc(self, output, y, mode = None):
    if mode is None:
        mode = self.mode
    
    if mode == 'regression':
        diff = output - y
        square = np.square(diff)
        loss = np.mean(square)
        aux = diff
    elif mode == 'binary':
        entropy = sigmoid_cross_with_logits(y, output)
        loss = np.mean(entropy)
        aux = [y, output]
    elif mode == 'select':
        entropy = softmax_cross_entropy_with_logits(y, output)
        loss = np.mean(entropy)
        aux = [output, y, entropy]
    
    return loss, aux

Dataset.forward_postproc = dataset_forward_postproc

In [58]:
def dataset_backprop_postproc(self, G_loss, aux, mode = None):
    if mode is None:
        mode = self.mode
    
    if mode == 'regression':
        diff = aux
        shape = diff.shape
        
        g_loss_square = np.ones(shape) / np.prod(shape)
        g_square_diff = 2 * diff
        g_diff_output = 1
        
        G_square = g_loss_square * G_loss
        G_diff = g_square_diff * G_square
        G_output_ = g_diff_output * G_diff
        
    elif mode == 'binary':
        y, output = aux
        shape = output.shape
        
        g_loss_entropy = np.ones(shape) / np.prod(shape)
        g_entropy_output = sigmoid_cross_entropy_with_logits_derv(y, output)
        
        G_entropy = g_loss_entropy * G_loss
        G_output_ = g_entropy_output * G_entropy
    
    elif mode == 'select':
        output, y, entropy = aux
        
        g_loss_entropy = 1.0 / np.prod(entropy.shape)
        g_entropy_output = softmax_cross_entropy_with_logits_derv(y, output)
        
        G_entropy = g_loss_entropy * G_loss
        G_output_ = g_entropy_output * G_entropy
        
    return G_output_

Dataset.backprop_postproc = dataset_backprop_postproc

In [22]:
def dataset_eval_accuracy(self, x, y, output, mode = None):
    if mode is None:
        mode = self.mode
    
    if mode == 'regression':
        mse = np.mean(np.square(output - y))
        accuracy = 1 - np.sqrt(mse) / np.mean(y)
    elif mode == 'binary':
        estimate = np.greater(output, 0)
        answer = np.equal(y, 1.0)
        correct = np.equal(estimate, answer)
        accuracy = np.mean(correct)
    elif mode == 'select':
        estimate = np.argmax(output, axis = 1)
        answer = np.argmax(y, axis = 1)
        correct = np.equal(estimate, answer)
        accuracy = np.mean(correct)
        
    return accuracy

Dataset.eval_accuracy = dataset_eval_accuracy

In [23]:
def dataset_get_estimate(self, output, mode = None):
    if mode is None:
        mode = self.mode
    
    if mode == 'regression':
        estimate = output
    elif mode == 'binary':
        estimate = sigmoid(output)
    elif mode == 'select':
        estimate = softmax(output)
    
    return estimate

Dataset.get_estimate = dataset_get_estimate

In [24]:
def dataset_train_prt_result(self, epoch, costs, accs, acc, time1, time2):
    print('    Epoch {} : cost = {:5.3f}, accuracy = {:5.3f}/{:5.3f} ({}/{} secs)'.format(epoch, np.mean(costs), np.mean(accs), acc, time1, time2))
    
def dataset_test_prt_result(self, name, acc, time):
    print('    Model {} test report : accoracy = {:5.3f}, ({} secs)\n'.format(name, acc, time))


Dataset.train_prt_result = dataset_train_prt_result
Dataset.test_prt_result = dataset_test_prt_result

In [25]:
class AbaloneDataset(Dataset):
    def __init__(self):
        super(AbaloneDataset, self).__init__('abalone', 'regression')
        
        rows, _ = load_csv("C:/Users/cheol/Downloads/Deep-Learning-Study-main/Deep-Learning-Study-main/Regression_Analysis/archive/abalone.csv")
        
        xs = np.zeros([len(rows), 10])
        ys = np.zeros([len(rows), 1])
        
        for n, row in enumerate(rows):
            if row[0] == 'I':
                xs[n, 0] = 1
            if row[0] == 'M':
                xs[n, 1] = 1
            if row[0] == 'F':
                xs[n, 2] = 1
            
            xs[n, 3:] = row[1:-1]
            ys[n, :] = row[-1:]
            
        self.shuffle_data(xs, ys, 0.8)
        
    def visualize(self, xs, estimates, answers):
        for n in range(len(xs)):
            x, est, ans = xs[n], estimates[n], answers[n]
            xstr = vector_to_str(x, '%4.2f')
            print('{} => estimate {:4.1f} : answers {:4.1f}'.format(xstr, est[0], ans[0]))
            
            

In [26]:
def relu(x):
    return np.maximum(x, 0)

def relu_dev(y):
    return np.sign(y)

In [27]:
def sigmoid(x):
    return np.exp(-relu(-x)) / (1.0 + np.exp(-np.abs(x)))

def sigmoid_derv(y):
    return y * (1 - y)

def sigmoid_cross_entropy_with_logits(z, x):
    return relu(x) - x * z + np.log(1 + np.exp(-np.abs(x)))

def sigmoid_cross_entropy_with_logits_derv(z, x):
    return -z + sigmoid(x)

def tanh(x):
    return 2 * sigmoid(2*x) - 1

def tanh_derv(y):
    return (1.0 + y) * (1.0 - y)

In [28]:
def softmax(x):
    max_elem = np.max(x, axis = 1)
    diff = (x.transpose() - max_elem).transpose()
    exp = np.exp(diff)
    sum_exp = np.sum(exp, axis = 1)
    probs = (exp.transpose() / sum_exp).transpose()
    return probs

def softmax_cross_entropy_with_logits(labels, logits):
    probs = softmax(logits)
    return -np.sum(labels * np.log(probs + 1.0e-10), axis = 1)

def softmax_cross_entropy_with_logits_derv(labels, logits):
    return softmax(logits) - labels

In [29]:
def load_csv(path, skip_header = True):
    with open(path) as csvfile:
        csvreader = csv.reader(csvfile)
        headers = None
        if skip_header:
            headers = next(csvreader, None)
        rows = []
        for row in csvreader:
            rows.append(row)
    
    return rows, headers

In [30]:
def onehot(xs, cnt):
    return np.eye(cnt)[np.array(xs).astype(int)]

def vector_to_str(x, fmt = '%.2f', max_cnt = 0):
    if max_cnt == 0 or len(x) <= max_cnt:
        return '[' + ','.join([fmt]*len(x)) % tuple(x) + ']'
    
    v = x[0:max_cnt]
    
    return '[' + ','.join([fmt]*len(v)) % tuple(v) + ',...]'

In [31]:
ad = AbaloneDataset()

In [32]:
am = MlpModel('abalone_model', ad, [])

In [62]:
am.exec_all(epoch_count = 10, report = 2)

Model abalone_model train started:
    Epoch 2 : cost = 8.128, accuracy = 0.734/0.697 (1/1 secs)
    Epoch 4 : cost = 7.388, accuracy = 0.745/0.715 (0/1 secs)
    Epoch 6 : cost = 7.242, accuracy = 0.748/0.741 (0/1 secs)
    Epoch 8 : cost = 7.133, accuracy = 0.748/0.728 (0/1 secs)
    Epoch 10 : cost = 7.051, accuracy = 0.751/0.720 (0/1 secs)
Model abalone_model train ended in 1 secs


  return _methods._mean(a, axis=axis, dtype=dtype,


    Model abalone_model test report : accoracy =   nan, (5 secs)

Model abalone_model visualization
[1.00,0.00,0.00,0.45,0.36,0.13,0.48,0.19,0.13,0.14] => estimate  8.2 : answers  7.0
[0.00,0.00,1.00,0.61,0.47,0.15,1.03,0.45,0.25,0.28] => estimate 11.1 : answers  9.0
[0.00,0.00,1.00,0.65,0.52,0.21,1.50,0.56,0.32,0.42] => estimate 12.6 : answers 16.0


  ret = ret.dtype.type(ret / rcount)
