# Behzad Shomali, Ilaha Manafova
### Exercise Group F


In [None]:
import numpy as np
import matplotlib.pyplot as plt

In [None]:
def parse_training_data(path):
    with open(path, 'r') as file:
        line = file.readline().strip()
        line_counter = 0
        while line:
            line_counter += 1

            if line_counter == 2: # the line contains the values of P, N, M
                line = line[1:] # skip the # sign at the beginning of the line
                line_segments = line.split(' ')
                for segment in line_segments:
                    segment = segment.strip()
                    if segment != '':
                        if segment[0] == 'P': # P=<P_VALUE>
                            P = int(segment[2:])
                        elif segment[0] == 'N': # N=<N_VALUE>
                            N = int(segment[2:])
                        elif segment[0] == 'M': # M=<M_VALUE>
                            M = int(segment[2:])
                
                X_train = np.empty((P, N))
                Y_train = np.empty((P, M))
            
            elif line_counter >= 3: # contains training data as well as desired output starting at line 3
                # line_segments would be smth. like:
                # ['1.0', '1.0', '1.0', '0.9']
                # where the first N items are the values
                # for input data dimensions and the
                # last M items are the last layer's
                # outputs

                line_segments = line.split(' ')
                line_segments = list(filter(lambda seg: seg != '', line_segments))
                X_train[line_counter-3] = np.asarray(list(map(float, line_segments[:N])))
                Y_train[line_counter-3] = np.asarray(list(map(float, line_segments[-M:])))
                
            line = file.readline().strip()
    
    return P, N, M, X_train, Y_train

In [None]:
def tanh(z):
    return (np.exp(z)-np.exp(-z)) / (np.exp(z)+np.exp(-z))

In [None]:
def tanh_derivative(z):
    return 1 - tanh(z)**2

In [None]:
def logistic(z):
    return 1/(1+np.exp(-z))

In [None]:
def logistic_derivative(z):
    f = logistic(z)
    return  f * (1-f)

In [None]:
def identity(z):
    return z

In [None]:
def identity_derivative(z):
    return np.ones_like(z)

In [None]:
def initilization(
    layers_num, # number of layers including input and output layer
    layers_neurons, # number of neurons per layer, represented in a list [l1_neurons, l2_neurons, ...] 
    layers_transfer_functions, # transfer function used for each layer, represented in a list [l1_transferFunc, l2_transferFunc, ...] 
    layers_lr, # learning rate used for each layer, represented in a list [l1_lr, l2_lr, ...] 
    random_seed=None # if random_seed is not specified, then it is set randomly
):
    '''
    Intialize a network based on the specified params
    User can set the number of layers, number of neurons 
    of a layer, and also use different transfer functions
    and learning rates for different layers.

    The network is implemented in the format of dictionaries,
    such that the information of each layer and weights can 
    be accessed simply
    '''
    if random_seed is not None:
        np.random.rand(random_seed)

    network = {}
    network['layers_num'] = layers_num
    
    for layer in range(layers_num):
        network[f'layer_{layer}'] = {}
        network[f'layer_{layer}']['neurons'] = layers_neurons[layer]
        network[f'layer_{layer}']['lr'] = layers_lr[layer]
        
        if layers_transfer_functions[layer] == 'tanh':
            network[f'layer_{layer}']['transfer_function'] = tanh
            network[f'layer_{layer}']['transfer_derivative'] = tanh_derivative
        elif layers_transfer_functions[layer] == 'logistic':
            network[f'layer_{layer}']['transfer_function'] = logistic
            network[f'layer_{layer}']['transfer_derivative'] = logistic_derivative
        else: # identity
            network[f'layer_{layer}']['transfer_function'] = identity
            network[f'layer_{layer}']['transfer_derivative'] = identity_derivative

        if layer > 0: 
            # network['w_hk']: indicates the weights/connections
            # that connect layer h to layer k        
            weights = np.random.random((layers_neurons[layer-1], layers_neurons[layer])) # shape: (prev_neurons, cur_neurons) 0 <=weights <=1
            weights *= 4 # 0 <= weights <= 4
            weights -= 2 # -2 <= weights <= 2
            network[f'w_{layer-1}{layer}'] = weights

    return network

In [None]:
def feed_forward(network, X):
    net = {} # keeps the weighted sum of inputs for current layer
    out = {} # keeps transfer_func(net)
    for layer in range(network['layers_num']-1):
        transfer_func = network[f'layer_{layer}']['transfer_function']
        
        if layer == 0: # input layer
            net[f'layer_{layer}'] = X.dot(network[f'w_{layer}{layer+1}'])
            out[f'layer_{layer}'] = transfer_func(net[f'layer_{layer}'])
        else: 
            net[f'layer_{layer}'] = out[f'layer_{layer-1}'].dot(network[f'w_{layer}{layer+1}'])
            out[f'layer_{layer}'] = transfer_func(net[f'layer_{layer}'])
    return net, out

In [None]:
def backpropagation(X, net, out, label):
    output_dim = label.shape[0]
    delta = {} # simply keeps delta for each layer's neurons
    
    # iterate layers from output layer to 
    # backward to backpropagte the error
    for layer in range(network["layers_num"]-1, -1, -1): 
        transfer_func_deriv = network[f'layer_{layer}']['transfer_derivative']
        delta[f'layer_{layer}'] = []
        
        if layer == network["layers_num"]-1: # output layer
            delta_output = (label-out[f'layer_{layer-1}']) * (transfer_func_deriv(out[f'layer_{layer-1}']))
            delta[f'layer_{layer}'] = delta_output

        elif layer >= 1: # hidden layer
            for h in range(network[f'layer_{layer}']['neurons']): # iterate over layer's neurons
                weight = network[f'w_{layer}{layer+1}'][h] # shape: neurons[layer+1]
                delta_h = (delta[f'layer_{layer+1}'].squeeze().dot(weight))*(transfer_func_deriv(out[f'layer_{layer-1}']))[h]
                delta[f'layer_{layer}'].append(delta_h)
            delta[f'layer_{layer}'] = np.vstack(delta[f'layer_{layer}']).squeeze()
        
        else:
            for h in range(network[f'layer_{layer}']['neurons']): # iterate over layer's neurons
                weight = network[f'w_{layer}{layer+1}'][h]
                delta_h = delta[f'layer_{layer+1}'].squeeze().dot(weight)* (X)[h]
                delta[f'layer_{layer}'].append(delta_h)
            delta[f'layer_{layer}'] = np.vstack(delta[f'layer_{layer}']).squeeze()

    return delta

In [None]:
def update(network, layer_delta, out, X):
    for layer in range(network['layers_num']-1):
        lr = network[f'layer_{layer}']['lr']
        for i in range(network[f'layer_{layer}']['neurons']):
            for j in range(network[f'layer_{layer+1}']['neurons']):
                if layer == 0: # input layer
                    network[f'w_{layer}{layer+1}'][i,j] += lr*layer_delta[f'layer_{layer+1}'][j]*X[i]
                else: # hidden/output layer
                    network[f'w_{layer}{layer+1}'][i,j] += lr*layer_delta[f'layer_{layer+1}'][j]*out[f'layer_{layer-1}'][i]

In [None]:
def error_function(out, label):
    output = out[f'layer_{network["layers_num"]-2}']
    errors = (output-label) ** 2 # shape: P*M
    cumulative_error = np.sum(errors, axis=0) # shape: 1*M this is only be useful when wh perform batch(cumulative) learning

    return cumulative_error

In [None]:
path = '/content/data/PA-B_training_data_04.txt'
P, input_dim, output_dim, X_train, Y_train = parse_training_data(path)

In [None]:
layers_num = 4
layers_neurons = [input_dim, 16, 8, output_dim]
layers_transfer_functions = ['tanh', 'tanh', 'tanh', 'tanh']
layers_lr = [5e-3, 5e-3, 1e-3, 1e-4]
iterations = 500

In [None]:
network = initilization(
    layers_num, 
    layers_neurons, 
    layers_transfer_functions,
    layers_lr, 
    random_seed=2000
)

In [None]:
global_error = []
for iter in range(iterations):  
    error = 0
    for i in range(X_train.shape[0]):
        X = X_train[i]
        Y = Y_train[i]
        net, out = feed_forward(network, X)
        error += np.mean(error_function(out, Y))/len(X)
        layer_delta = backpropagation(X, net, out, Y)
        update(network, layer_delta, out, X)
    global_error.append(error)
    
    if iter % int(iterations/25) == 0:
        print(f'Iteration: {iter:3d}/ Error: {error:.4f}')

with open(f'./learning_curve_{path.split("/")[-1]}', 'w') as f:
    for error in global_error:
        f.write(f"{error}\n")

In [None]:
plt.rcParams['figure.figsize'] = (10,6)
fig, ax = plt.subplots(2, 2, sharex=True)

for i in range(2):
    for j in range(2):
        file_suffix = f'{2*(i)+(j+1)}'
        errors_list = []
        with open(f'learning_curve_PA-B_training_data_0{2*(i)+(j+1)}.txt', 'r') as f:
            line = f.readline()
            while line:
                errors_list.append(float(line))
                line = f.readline()
        
        ax[i][j].plot(errors_list)
        ax[i][j].set_title(f'Learning curve for training_data_{file_suffix}')
        if i == 1:
            ax[i][j].set_xlabel('Iterations')
        if j == 0:
            ax[i][j].set_ylabel('Error')