In [724]:
import numpy as np
import matplotlib.pyplot as plt
import time

In [725]:
# Store user specified arguments into variables
train_file = 'train_data4.txt'
target_file = 'train_target4.txt'
layer_num = 2
units = [4,1]
activation = 'sigmoid' #'tanh'
loss = 'SSE' #'CE'
learn_rate = 0.1
max_epochs = 1E4 #1E7
batch_size = 10
tolerance = 0.05
output_file = 'output_1.txt'

In [726]:
# Load feature matrix (training inputs) and target vector (regression targets)
feature = np.genfromtxt(train_file, delimiter=' ')

target = np.genfromtxt(target_file, delimiter=' ')
target = target[:, np.newaxis]

In [743]:
# Check the dimension of the training feature and target
print(feature.shape)
print(target.shape)

(200, 5)
(200, 1)


In [728]:
class Activation:
    """

    A class for activation functions and their derivatives

    Attributes
    ----------
    y : (len(x), ) ndarray of float
        the output from the activiation function

    y_prime : (len(x), ) ndarray of float
        the derivative the activiation function

    activation : str
        the name of the activation function specified by the user ('sigmoid' or 'tanh')

    scale_x : float
        a factor that will be multiplied with input x (default 1)

    scale_y : float
        a factor that will be multiplied with result y before returning (default 1)

    Methods
    -------
    sigmoid(x)
        evaluate the sigmoid of input x

    sigmoid_derivative()
        evaluate the derivative of the sigmoid given x

    tanh(x)
        evaluate the tanh of input x

    tanh_derivative()
        evaluate the derivative of the tanh given x

    call(x)
        a driver function to evaluate the given activiation function

    differentiate()
        a driver function to evaluate the derivative of the given activiation function

    """

    def __init__(self, activation):
        """

        Initialize all attributes of the class

        Parameters
        ---------- 
        activation : str
            the name of the activation function specified by the user ('sigmoid' or 'tanh')

        """
        self.y = []
        self.y_prime = []
        self.activation = activation
        self.scale_x = 1
        self.scale_y = 1

    def sigmoid(self, x):
        """

        Evaluate the sigmoid of input x

        y = 1 / (1 + exp(-x * scale_x))

        Parameters
        ---------- 
        x : (len(x),) ndarray of float
            input data

        Returns
        -------
        y : (len(x), ) ndarray of float
            output from the sigmoid function

        """

        self.y = 1 / (1 + np.exp(-x * self.scale_x))

        return self.y

    def sigmoid_derivative(self):
        """

        Evaluate the derivative of the sigmoid given x

        Given y,
        y' = y * (1 - y) * scale_x

        Returns
        -------
        y_prime : (len(x), ) ndarray of float
            output from differentiating the sigmoid function

        """

        self.y_prime = self.y * (1 - self.y) * self.scale_x

        return self.y_prime

    def tanh(self, x):
        """

        Evaluate the tanh of input x

        y =  tanh(x_scale * x) * y_scale
        tanh(z) = (exp(z) - exp(-z)) / (exp(z) + exp(-z))

        Parameters
        ----------
        x : (len(x), ) ndarray of float
            input data

        Returns
        -------
        y : (len(x), ) ndarray of float
            output from the tanh function

        """

        self.y = np.tanh(x * self.scale_x) * self.scale_y

        return self.y

    def tanh_derivative(self):
        """

        Evaluate the derivative of the tanh given x

        Given y,
        y' = (scale_y - y) * (scale_y + y) * (scale_x / scale_y) 

        Returns
        -------
        y_prime : (len(x), ) ndarray of float
            output from differentiating the tanh function

        """

        self.y_prime = (self.scale_y - self.y) * \
            (self.scale_y + self.y) * (self.scale_x / self.scale_y)

        return self.y_prime

    def call(self, x):
        """

        A driver function to evaluate the given activiation function

        Parameters
        ----------
        x : (len(x), ) ndarray of float
            input data

        Returns
        -------
        y : (len(x), ) ndarray of float
            output from the given activiation function

        """

        if self.activation == 'sigmoid':

            return self.sigmoid(x)

        elif self.activation == 'tanh':

            return self.tanh(x)

    def differentiate(self):
        """

        A driver function to evaluate the derivative of the given activiation function

        Returns
        -------
        y : (len(x), ) ndarray of float
            output from the given activiation function

        """

        if self.activation == 'sigmoid':

            return self.sigmoid_derivative()

        elif self.activation == 'tanh':

            return self.tanh_derivative()

In [729]:
class Dense:
    """

    A class containing the data structure for a dense layer, and some helper functions

    Attributes
    ----------
    n_output : int
        number of neurons in the layer

    activation : Activation
        a class for evaluating the activation function and its derivative

    weight : (n_input + 1, n_output) ndarray of float
        weight matrix of the layer, accounting for a bias term

    prev_wght_update : (n_input + 1, n_output) ndarray of float
        record the previous weight matrix

    input : (n_input + 1, ) ndarray of float
        input to the layer with an additional bias term, [x_1, ... , x_n, 1]

    output : (n_output, ) ndarray of float
        output the layer, [y_1, ... , y_m]

    Methods
    -------
    build(n_input)
        initialize weight, prev_wght_update, and output with appropriate dimension;
        initialize weight matrix with random numbers from -1 to 1

    call(x)
        compute the output of the layer given input vector x

    """

    def __init__(self, n_output_nodes, activation):
        """

        Initialize the layer by specifying the number of neurons and the activation function

        Parameters
        ---------- 
        n_output_nodes : int
            number of neurons in the layer

        activation : str
            the activiation function used in the layer

        """

        self.n_output = n_output_nodes
        self.activation = Activation(activation)

    def build(self, n_input):
        """

        Initialize weight, prev_wght_update, and output with appropriate dimension;
        Initialize weight matrix with random numbers from -1 to 1

        Parameters
        ----------
        n_input : int
            number of input data (dimension of the features or number of neurons in the previous layer)

        """

        self.weight = np.random.uniform(
            low=-1, high=1, size=(n_input + 1, self.n_output))
        self.output = np.zeros((1, self.n_output))
        self.prev_wght_update = np.zeros((n_input + 1, self.n_output))

    def call(self, x):
        """

        Compute the output of the layer given input vector x
        
        y = activation_function(x @ weight)
        (n_output, ) = activation_function((n_input + 1, ) @ (n_input + 1, n_output))

        Parameters
        ----------
        x : (n_input, ) ndarray of float
            input to the layer, [x_1, ... , x_n]

        Returns
        -------
        output : (n_output, ) ndarray of float
            output the layer, [y_1, ... , y_m]

        """

        self.input = np.append(x, 1)
        v = np.matmul(self.input, self.weight)
        self.output = self.activation.call(v)

        return self.output

In [730]:
# Testing
np.random.seed(2020)
layer = Dense(4, activation='sigmoid')
layer.build(feature.shape[1])
layer.call(feature[1])

array([0.87265074, 0.53823522, 0.26966516, 0.57152884])

In [731]:
class Losses:
    """

    A class for loss functions and their gradients

    Attributes
    ----------
    loss : str
        the name of the loss function specified by the user ('SSE' or 'CE')

    Methods
    -------
    squared_error(y_true, y_pred)
        evaluate the squared error loss

    cross_entropy(y_true, y_pred)
        evaluate the cross entropy loss

    squared_error_grad(y_true, y_pred)
        evaluate the tanh of input x

    cross_entropy_grad(y_true, y_pred)
        evaluate the derivative of the tanh given x

    __call__(y_true, y_pred)
        a driver function to evaluate the given activiation function

    gradient(y_true, y_pred)
        a driver function to evaluate the derivative of the given activiation function

    """

    def __init__(self, loss):
        self.loss = loss
        
    def squared_error(self, y_true, y_pred):
        return (y_true - y_pred) ** 2
    
    def cross_entropy(self, y_true, y_pred):
        return - y_true * np.log(y_pred) - (1 - y_true) * np.log(1 - y_pred)
    
    def squared_error_grad(self, y_true, y_pred):
        return y_true - y_pred
    
    def cross_entropy_grad(self, y_true, y_pred):
        return (y_true - y_pred) / (y_pred - y_pred ** 2)
    
    def __call__(self, y_true, y_pred):
        if self.loss == 'SSE':
            return self.squared_error(y_true, y_pred)
        elif self.loss == 'CE':
            return self.cross_entropy(y_true, y_pred)

    def gradient(self, y_true, y_pred):
        if self.loss == 'SSE':
            return self.squared_error_grad(y_true, y_pred)
        elif self.loss == 'CE':
            return self.cross_entropy_grad(y_true, y_pred)

In [741]:
class Multilayers:
    def __init__(self, x_shape, layers, input_scale=1, output_scale=1):
        self.model = layers
        self.n_layers = len(layers)

        n_input = x_shape
        for layer in self.model:
            layer.build(n_input)
            layer.activation.x_scale = input_scale
            layer.activation.y_scale = output_scale
            n_input = layer.n_output

    def predict(self, x):

        input_signal = x
        for i, layer in zip(range(self.n_layers), self.model):
            output_signal = layer.call(input_signal)
            
            input_signal = output_signal

        self.y = output_signal
        return self.y

    def back_prop(self, target, learn_rate, loss, momentum=0):
        prev_update = 0
        
        i = 0
        for layer in reversed(self.model):
                           
            prime = layer.activation.differentiate()
            
            if i == 0:
                
                errors = loss.gradient(target, self.y)
                
            else:
                
                errors = np.inner(deltas, prev_weight[:-1])
                
            deltas = errors * prime
            
            prev_weight = np.copy(layer.weight)
            
            update = learn_rate * np.outer(layer.input, deltas)
            
            layer.weight += momentum * layer.prev_wght_update + update
            
            layer.prev_wght_update = update
            
            i += 1

In [742]:
np.random.seed(2020)

model = Multilayers(feature.shape[1],[
    Dense(5, activation='sigmoid'),
    Dense(3, activation='sigmoid'),
    Dense(2, activation='sigmoid')
])

print('Before back prop:', model.predict(feature[0]))
model.back_prop(target[0], learn_rate=0.1, loss=loss_function)
print('After 1 back prop:', model.predict(feature[0]))
model.back_prop(target[0], learn_rate=0.1, loss=loss_function)
print('After 2 back prop:', model.predict(feature[0]))
model.back_prop(target[0], learn_rate=0.1, loss=loss_function)
print('After 3 back prop:', model.predict(feature[0]))

Before back prop: [0.36477282 0.17336076]
(2,) (2,) (2,)
(3,) (3,) (3,)
(5,) (5,) (5,)
After 1 back prop: [0.27658303 0.12676493]
(2,) (2,) (2,)
(3,) (3,) (3,)
(5,) (5,) (5,)
After 2 back prop: [0.20351656 0.09080108]
(2,) (2,) (2,)
(3,) (3,) (3,)
(5,) (5,) (5,)
After 3 back prop: [0.14542603 0.06356227]


In [734]:
def train(model, feature, target, loss='SSE', learn_rate=0.1, batch_size=9, momentum=0):
    epoch_size = len(feature)
    batch_num = int(np.ceil(epoch_size / batch_size))

    order = np.random.choice(range(epoch_size), size=epoch_size, replace=False)
    loss_function = Losses(loss)
    history = []

    this_batch_size = 0
    i = 0
    for batch_i in range(batch_num):
        if batch_i == batch_num - 1 and epoch_size % batch_size != 0:
            this_batch_size = epoch_size % batch_size
        else:
            this_batch_size = batch_size
        
        batch_count = 0
        batch_error = 0
        while batch_count < this_batch_size:
            pos = order[i]
            y_pred = model.predict(feature[pos])
            y_true = target[pos]
            model.back_prop(y_true, learn_rate, loss_function, momentum)
            
            batch_error += np.mean(loss_function(y_true, y_pred))
                                   
            batch_count += 1
            i += 1
            
        mean_batch_error = batch_error / this_batch_size
        history.append(mean_batch_error)
        
    return history

In [735]:
def one_hot_encoding(label_vector):
    binary_rep = ['{0:b}'.format(int(label)) for label in label_vector]
    max_bit_num = max([len(binary_label) for binary_label in binary_rep])
    binary_rep = [binary_label.zfill(max_bit_num) for binary_label in binary_rep]
    binary_rep = [[int(bit) for bit in binary_label] for binary_label in binary_rep]
    binary_rep = np.array(binary_rep)
    return binary_rep

In [736]:
periodic_output = 2E2
plt.rcParams['figure.figsize'] = (16,6)

In [737]:
file_num = 1

# Store user specified arguments into variables
train_file = f'train_data{file_num}.txt'
target_file = f'train_target{file_num}.txt'

feature = np.genfromtxt(train_file, delimiter=' ')
target = np.genfromtxt(target_file, delimiter=' ')
target = target[:, np.newaxis]

layer_num = 3
units = [5,3,1]
activation = 'tanh' #'tanh' 'sigmoid'
loss = 'SSE' #'CE' 'SSE'
learn_rate = 0.5 #0.05
max_epochs = 5E3 #1E4
batch_size = 20
tolerance = 1E-4
output_file = f'output{file_num}.txt'

momentum = 0.1 #0.1

if loss == 'CE':
    target = one_hot_encoding(target)



In [738]:
print(f'Loss: {loss}; Learning Rate: {learn_rate:.4f}; Batch Size: {batch_size}')
#target_range = target.max() - target.min()
target_range = 4

model = Multilayers(feature.shape[1], 
                    [Dense(n_output_nodes, activation) for n_output_nodes in units], 
                    output_scale=target_range,
                    input_scale=1)

min_batch_error = tolerance + 1
error = []
history = []

start_time = time.time()
i = 0
while min_batch_error > tolerance and i < max_epochs:
    error = train(model, feature, target, loss, learn_rate, batch_size, momentum=momentum)
    min_batch_error = min(error)
    error = np.mean(error) # Delete
    history = np.append(history, error)
    
    if i % periodic_output == 0:
        print(i, min_batch_error)

    i += 1
    
    
print(f"--- {(time.time() - start_time):.6f}s seconds ---")
print(i, min_batch_error)
np.savetxt(output_file, history, delimiter=' ')

plt.title(f'Loss: {loss}; Learning Rate: {learn_rate:.2f}; Batch Size: {batch_size}')
plt.plot(history, color='red')
plt.plot([0, len(history)], [0, 0])
plt.yscale('log')
plt.show()
print()

Loss: SSE; Learning Rate: 0.5000; Batch Size: 20
0 1.6880425096481495
1000 1.3973344450422558
2000 1.0005286566656406
3000 1.5084671260818885
4000 1.4628810873395799


KeyboardInterrupt: 

In [None]:
def lookinside(model):
    print('------Weights------')
    print()
    for layer in model.model:
        print(layer.weight)
        print()

    print()
    print('------Outputs------')
    print()
    for layer in model.model:
        print(layer.output)
        print()

In [None]:
plt.rcParams['figure.figsize'] = (6,6)
for i in range(feature.shape[1]):
    #plt.hist(feature[:,i])
    plt.plot(feature[:,i], target, 'o')
    plt.show()

In [None]:
transform = Activation(activation)
transform.scale_y = 3
transform.scale_x = 1
x = np.arange(-3,3,0.001)
y = transform.call(x)
plt.plot(x,y)