In [36]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from numba import njit
import random


In [37]:

# Backpropagation algorithm
# default config for the backpropagation algo
l_rate = 0.01
max_it = 200
min_error = 0.1
# obs: changing the learning_rate and the initial weight can improve the performance
# the current config is the optimal config found for the hyperbolic tangent until this moment

# open the csv file and get the data


def get_training_data(filename):
    with open(filename) as f:
        data = pd.read_csv(f)
        data = data.values
    return data

# choose which curve will be used by the neurons of the net

# @njit(parallel=True, fastmath=True, cache=True)

@njit(parallel=True, fastmath=True, cache=True)
def fx_tanh(x):
    return np.tanh(x)

@njit(parallel=True, fastmath=True, cache=True)
def d_fx_tanh(x):
    return (1/np.cosh(x))**2

@njit(parallel=True, fastmath=True, cache=True)
def fx_logistic(x):
    return 1/(1+np.exp(-x))

@njit(parallel=True, fastmath=True, cache=True)
def d_fx_logistic(x):
    return np.exp(x)/(1+np.exp(x))**2

def choose_curve(curve):
    if curve == "tanh":
        return fx_tanh, d_fx_tanh
    elif curve == "logistic":
        return fx_logistic, d_fx_logistic
    else:
        print("OPTION NOT VALID")
        exit()
    

# initialize the weights as small random values


@njit(parallel=True, fastmath=True, cache=True)
def initialize_weights(num_classes: int, num_features: int):
    # number of inputs (class features)
    num_inputs = num_features
    # number of output neurons
    num_outputs = num_classes
    # number of hidden neurons (geometry mean of inputs and outputs)
    num_hidden = int(np.sqrt(num_inputs*num_outputs))
    # initialize the weights
    # add 0.005 in order to prevent 0
    weight_hidden = np.random.rand(num_hidden, num_inputs)/100 + 0.0005
    weight_output = np.random.rand(num_outputs, num_hidden) * 3 - 1.5
    # initialize the bias, in this case all bias = 0
    bias_hidden = np.zeros((num_hidden, num_inputs))
    bias_output = np.zeros((num_outputs, num_hidden))
    return weight_hidden, weight_output, bias_hidden, bias_output

# probably not used anymore..., too lazy to check...
# calculate the output for a single neuron with the previous layer output as input
# @njit


def neuron_output(weights, fx, inputs):
    net = np.dot(inputs, weights)
    # weighted_input = inputs
    # for i in range (0, len(weights)):
    #  weighted_input[0] *= weights[0]
    # net = weighted_input.sum()
    return fx(net)

# calculate the output from each neuron of the given layer
# each layer is described by set of weights of each neuron

# @njit


def calculate_layer(weights_layer, fx, layer_inputs):
    net = weights_layer @ np.array(layer_inputs)
    outputs = []
    for i in range(0, len(net)):
        outputs.append(fx(net[i]))
    return outputs
    # return fx(net)

# calculate the expected output for the class


def calculate_expected_values(curve_type, class_number, num_classes):
    expected_values = 0
    if curve_type == "logistic":
        expected_values = np.zeros(num_classes)
        expected_values[class_number-1] = 1
    elif curve_type == "tanh":
        expected_values = np.full(num_classes, -1)
        expected_values[class_number-1] = 1
    else:
        exit()
    return expected_values

# calculate the error for the output layer


def calculate_output_error(curve, weights, expected, attained, inputs):
    fx, d_fx = choose_curve(curve)
    error = []
    for i in range(0, len(expected)):
        err = (expected[i] - attained[i])*d_fx(inputs[i])
        error.append(err)
    return error

# calculate the error for the hidden layer
# needs revision


def calculate_hidden_error(curve, weights_hidden, weights_output, error_output, inputs):
    fx, d_fx = choose_curve(curve)
    error = []
    for i in range(0, len(weights_hidden)):
        err = np.dot(error_output, weights_output[i])
        err *= d_fx(np.dot(inputs, weights_hidden[i]))
        error.append(err)
    return error

# adjust the weights of a given layer according to the layer error


def adjust_weights(weights, learning_rate, error, layer_input):
    new_weights = []
    for i in range(0, len(weights)):
        neuron_weights = []
        for j in range(0, len(weights[0])):

            neuron_weights.append(
                weights[i][j] + learning_rate*error[i]*layer_input[j])
        new_weights.append(neuron_weights)
    return new_weights


Separação das amostras


In [38]:
# creio que a separacao de amostras seja desnecessaria
# algumas observacoes:
# a base de treinamento esta desbalanceada
# ~1/4 dos dados sao da classe 3 enquanto ~1/4 sao das classes 1 e 2
# durante o treinamento de dados, seria interessante tentar balancear os dados
# dividindo o conjunto de treino em 5 conjuntos, cada um contendo apenas as
# amostras de sua respectiva classe, e permutar ciclicamente cada conjunto.
# e.g., treinar com uma amostra do conjunto1 -> treinar com uma amostra do conjunto2 -> conjunto3 -> conjunto4 -> conjunto5 -> conjunto1 -> ...

def choose_sample(class_number, data):
    sample_size = len(data)
    random_sample = random.randint(0, sample_size-1)
    sample = data[random_sample]
    while (sample[-1] != class_number):
        random_sample = random.randint(0, sample_size-1)
        sample = data[random_sample]
    return sample


Treinamento da rede


In [39]:
# training with an unbalanced dataset
# train a neuron net with a specificied curve and data
def train_through_data(curve, data):
    # get information from the dataset
    num_classes = np.unique(data[:, -1]).size
    num_features = data.shape[1] - 1
    sample_size = data.shape[0]

    # remove the last column from the inputs list, i.e., remove the class type from inputs
    inputs = np.delete(data, len(data[0])-1, 1)

    # initialize the initial weights
    weight_hidden, weight_output, bias_hidden, bias_output = initialize_weights(
        num_classes, num_features)
    # get the function for the choosen curve
    fx, d_fx = choose_curve(curve)

    # train until max_it
    for j in range(0, max_it):
        # train the net for sample in dataset
        for i in range(0, sample_size):
            # choose a random sample in order to prevent overfitting
            new_i = random.randint(0, sample_size-1)

            # calculate the expected value for this set of features
            expected_values = calculate_expected_values(
                curve, data[new_i][-1], num_classes)

            # calculate the value for the hidden layer and the output layer
            hidden_values = calculate_layer(weight_hidden, fx, inputs[new_i])
            output_values = calculate_layer(weight_output, fx, hidden_values)

            # calculate the error for each layer
            error_output_layer = calculate_output_error(
                curve, weight_output, expected_values, output_values, hidden_values)
            error_hidden_layer = calculate_hidden_error(
                curve, weight_hidden, weight_output, error_output_layer, inputs[new_i])
            # print("Expected: ")
            # print(expected_values)
            # print("Output: ")
            # print(output_values)

            # adjust weights for each layer
            weight_output = adjust_weights(
                weight_output, l_rate, error_output_layer, hidden_values)
            weight_hidden = adjust_weights(
                weight_hidden, l_rate, error_hidden_layer, inputs[new_i])
        if (j % 100 == 0):
            print(j)
    # return the trained values for each layer
    return weight_hidden, weight_output


'''

#experimental training with a balanced dataset
def train_through_data(curve, data):
  #get information from the dataset
  num_classes = np.unique(data[:, -1]).size
  num_features = data.shape[1] - 1
  sample_size = len(data)

  #initialize the initial weights
  weight_hidden, weight_output, bias_hidden, bias_output = initialize_weights(num_classes, num_features)
  #get the function for the choosen curve
  fx, d_fx = choose_curve(curve)
  
  current_class = 1

  #train until max_it
  for j in range(0, max_it):
    #train the net for sample in dataset
    for i in range(0, sample_size):
      #choose a random sample in order to prevent overfitting
      sample = choose_sample(current_class, data)
      
      #separate the class from the inputs list, i.e., remove the class type from inputs
      sample_class = sample[-1]
      inputs = np.delete(sample, len(sample)-1, 0)

      #calculate the expected value for this set of features
      expected_values = calculate_expected_values(curve, sample_class, num_classes)
      
      #calculate the value for the hidden layer and the output layer
      hidden_values = calculate_layer(weight_hidden, fx, inputs)
      output_values = calculate_layer(weight_output, fx, hidden_values)
      
      #calculate the error for each layer
      error_output_layer = calculate_output_error(curve, weight_output, expected_values, output_values, hidden_values)
      error_hidden_layer = calculate_hidden_error(curve, weight_hidden, weight_output, error_output_layer, inputs)
      #print("Expected: ")
      #print(expected_values)
      #print("Output: ")
      #print(output_values)
      
      #adjust weights for each layer
      weight_output = adjust_weights(weight_output, l_rate, error_output_layer, hidden_values)
      weight_hidden = adjust_weights(weight_hidden, l_rate, error_hidden_layer, inputs)

      current_class %= num_classes
      current_class += 1
    if(j % 100 == 0):
      print(j)
  #return the trained values for each layer
  return weight_hidden, weight_output
'''


'\n\n#experimental training with a balanced dataset\ndef train_through_data(curve, data):\n  #get information from the dataset\n  num_classes = np.unique(data[:, -1]).size\n  num_features = data.shape[1] - 1\n  sample_size = len(data)\n\n  #initialize the initial weights\n  weight_hidden, weight_output, bias_hidden, bias_output = initialize_weights(num_classes, num_features)\n  #get the function for the choosen curve\n  fx, d_fx = choose_curve(curve)\n  \n  current_class = 1\n\n  #train until max_it\n  for j in range(0, max_it):\n    #train the net for sample in dataset\n    for i in range(0, sample_size):\n      #choose a random sample in order to prevent overfitting\n      sample = choose_sample(current_class, data)\n      \n      #separate the class from the inputs list, i.e., remove the class type from inputs\n      sample_class = sample[-1]\n      inputs = np.delete(sample, len(sample)-1, 0)\n\n      #calculate the expected value for this set of features\n      expected_values = c

Testagem da rede


In [40]:
# the assigned class will be the index with the higher value
def assign_class(output):
    return output.index(max(output)) + 1

# create a confusion matrix


# def confusion_matrix():
#     return


def test_network(curve, weight_hidden, weight_output, data):
    # get information from the dataset
    num_classes = np.unique(data[:, -1]).size
    confusion_matrix = np.zeros((num_classes, num_classes))

    num_features = data.shape[1] - 1
    sample_size = data.shape[0]
    # remove the last column from the inputs list, i.e., remove the class type from inputs
    inputs = np.delete(data, len(data[0])-1, 1)

    fx, d_fx = choose_curve(curve)
    assigned_class = []
    true_class = []
    count_errors = 0
    for i in range(0, sample_size):
        # calculate the value for the hidden layer and the output layer
        hidden_values = calculate_layer(weight_hidden, fx, inputs[i])
        output_values = calculate_layer(weight_output, fx, hidden_values)

        # debug output
        assigned_class.append(assign_class(output_values))
        true_class.append(data[i][-1])
        if (true_class[i] != assigned_class[i]):
            count_errors += 1
        # print("Expected: ", true_class[i], " Attained: ", assigned_class[i])

        # debug output
        expected_values = calculate_expected_values(
            curve, data[i][-1], num_classes)
        # print("Expected: ")
        # print(expected_values)
        # print("Output: ")
        # print(output_values)
        confusion_matrix[true_class[i]-1][assigned_class[i]-1] += 1

    print('Matriz de confusão')
    print(confusion_matrix)

    # debug output
    print("Erros: ")
    print(count_errors)
    print("Total de amostras: ")
    print(sample_size)
    # print("Peso oculta: ")
    # print(weight_hidden)
    # print("Peso saida: ")
    # print(weight_output)
    return


Programa


In [41]:
def backpropagation_algo():
    # curve = "tanh"
    curve = "logistic"
    data = get_training_data("treinamento.csv")
    weight_hidden, weight_output = train_through_data(curve, data)

    data = get_training_data("teste.csv")
    test_network(curve, weight_hidden, weight_output, data)
    return


In [42]:
backpropagation_algo()


0
100
Matriz de confusão
[[ 36.   0.  20.   0.   0.]
 [  0.   0.   0.  53.   0.]
 [  0.   0. 102.   0.   0.]
 [  0.   0.   0.  75.   0.]
 [  0.   0.   0.   0.  66.]]
Erros: 
73
Total de amostras: 
352
