## Utility Function

In [1]:
import numpy as np


def linear(x):
    return x


def relu(x):
    return np.maximum(x, 0)


def sigmoid(x):
    return 1.0 / (1.0 + np.exp(-x))


def softmax(x):
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=0)

## MLPClassifier Implementation

Parameter yang ada pada kelas MLPClassifier yaitu: **struktur jaringan (jumlah layer, jumlah neuron setiap layer, fungsi aktivasi setiap layer), initial weights tiap neuron, learning_rate, error_threshold, max_iter, batch_size**

In [2]:
import math
import numpy as np


class FFNNLayer:
    def __init__(self, number_of_neurons: int, activation_function: str):
        """
        :param number_of_neurons:
        :param activation_function:
        """
        self.number_of_neurons = number_of_neurons
        self.activation_function = activation_function


class MLPClassifier:
    def __init__(self, layers: list, learning_rate, error_threshold, max_iter, batch_size, weights, stopped_by, expected_weights = None):
        """
        :param layers: list of FFNNLayer to specify the activation function and num of neurons for each layers
        :param learning_rate: the learning rate
        :param error_threshold: the error threshold
        :param max_iter: max iter to stop iteration
        :param batch_size: the size of batch for each mini batch
        """
        self.num_of_layers = len(layers)
        self.layers = layers
        self.learning_rate = learning_rate
        self.error_threshold = error_threshold
        self.error_sum = 1
        self.max_iter = max_iter
        self.batch_size = batch_size
        self.X_train = []
        self.y_train = []
        self.neuron_values = []
        self.weights = [weight[1:] for weight in weights]
        self.bias_weights = [weight[0] for weight in weights]
        self.prediction = []
        self.num_of_features = 0
        self.num_of_batches = 0
        self.d_weights = None
        self.d_bias_weights = None
        self.expected_stopped_by = stopped_by
        self.expected_weights = expected_weights
        self.expected_output = None
        self.stopped_by = None
        self.current_inputs = None

    def fit(self, X_train, y_train):
        self.X_train = X_train
        self.y_train = y_train
        self.num_of_features = len(self.X_train)
        self.num_of_batches = math.ceil(len(self.X_train) / self.batch_size)

        # the first neuron is the X inputs themselves
        self.neuron_values = [[None for _ in range(layer.number_of_neurons)] for layer in self.layers]
        num_iter = 0
        while num_iter < self.max_iter:
            num_of_batches = math.ceil(len(self.X_train) / self.batch_size)
            err = 0
            for i in range(num_of_batches):
                self.__forward(i)
                self.__backward(i)
                err += self.__calculate_error(i)

            # Update the average error for this iteration
            self.error_sum = err / num_of_batches

            # Check if the error is below the threshold
            if self.error_sum <= self.error_threshold:
                break

            num_iter += 1

        self.stopped_by = "max_iteration" if num_iter == self.max_iter else "error_threshold"

        if self.expected_weights:
            self.__print_final_weights()

    def predict(self, X_test):
        """Perform forward pass to make predictions on input X_test

        Args:
            X_test: Input data for prediction (list)

        Returns:
            Predicted outputs for each sample in X_test
        """
        predictions = []
        current_inputs = np.array(X_test)
        for i in range(self.num_of_layers):
            net = np.matmul(current_inputs, self.weights[i]) + self.bias_weights[i]
            act_func = self.layers[i].activation_function
            if act_func == 'linear':
                res = [linear(x) for x in net]
            elif act_func == 'relu':
                res = [relu(n) for n in net]
            elif act_func == 'sigmoid':
                res = [sigmoid(n) for n in net]
            elif act_func == "softmax":
                res = [softmax(n) for n in net]
            current_inputs = res
        predictions = res.toList()
        return predictions

    def calculate_sse(self):
        sse = 0
        for layer in range(len(self.expected_weights)):
            for neuron in range(len(self.expected_weights[layer])):
                expected = np.array(self.expected_weights[layer][neuron])
                result = self.bias_weights[layer] if neuron == 0 else self.weights[layer][neuron-1]
                squared_error = (expected - result) ** 2
                sse += np.sum(squared_error)
        return sse

    def __forward(self, batch):
        start_idx = self.batch_size * batch
        self.expected_output = self.y_train[start_idx:start_idx + self.__get_curr_batch_size(batch)]
        self.current_inputs = self.X_train[start_idx:start_idx + self.__get_curr_batch_size(batch)]
        res = self.current_inputs
        for i in range(self.num_of_layers):
            net = [np.matmul(x, self.weights[i]) + self.bias_weights[i] for x in res]
            act_func = self.layers[i].activation_function
            if act_func == 'linear':
                res = [linear(x) for x in net]
            if act_func == 'relu':
                res = [relu(n) for n in net]
            if act_func == 'sigmoid':
                res = [sigmoid(n) for n in net]
            if act_func == "softmax":
                res = [softmax(n) for n in net]
            self.neuron_values[i] = res
        self.prediction = list(self.neuron_values[-1])

    def __backward(self, batch_idx):
        """
        do backward propagation for each batch
        :param batch_idx: the current batch that is processed
        """
        self.__init_d_weights()
        # get the current batch size
        batch_size = self.__get_curr_batch_size(batch_idx)

        # for each X in the batch
        for i in range(batch_size):
            d_k = np.zeros(0)
            for j in range(self.num_of_layers - 1, -1, -1):
                if j == self.num_of_layers - 1:
                    delta = self.__calc_output_layer_delta(i)
                else:
                    delta = self.__calc_hidden_layer_delta(i, j, d_k)
                x = self.current_inputs[i] if j == 0 else self.neuron_values[j - 1][i]
                self.d_weights[j] += np.array([[d * n for d in delta] for n in x])
                self.d_bias_weights[j] += np.array(delta)
                d_k = delta.reshape(delta.shape[0], 1)
        
        self.weights = [np.array(self.weights[k]) + np.array(self.d_weights[k]) * self.learning_rate for k in
                        range(len(self.weights))]
        self.bias_weights = [np.array(self.bias_weights[k]) + np.array(self.d_bias_weights[k]) * self.learning_rate for
                             k in range(len(self.bias_weights))]

    def __calculate_error(self, batch_idx):
        """
        Calculate the error for the current batch
        :param batch_idx: the current batch that is processed
        """
        start_idx = self.batch_size * batch_idx
        end_idx = start_idx + self.__get_curr_batch_size(batch_idx)
        y_true = np.array(self.y_train[start_idx:end_idx])
        y_pred = np.array(self.prediction)

        # Get the activation function of the output layer
        act_func = self.layers[-1].activation_function

        # Calculate the error based on the activation function
        if act_func in ['relu', 'sigmoid', 'linear']:
            return 0.5 * np.sum((y_true - y_pred) ** 2)
        elif act_func == 'softmax':
            return -np.sum(y_true * np.log(y_pred))
        else:
            raise ValueError(f"Unsupported activation function: {act_func}")

    def __update_weights(self):
        self.weights = [np.array(self.weights[k]) + np.array(self.d_weights[k]) * self.learning_rate for k in
                        range(len(self.weights))]
        self.bias_weights = [np.array(self.bias_weights[k]) + np.array(self.d_bias_weights[k]) * self.learning_rate for
                             k in range(len(self.bias_weights))]

    def __init_d_weights(self):
        self.d_weights = [np.array([np.zeros(len(neuron_weight)) for neuron_weight in layer_weight])
                          for layer_weight in self.weights]
        self.d_bias_weights = [np.zeros(layer.number_of_neurons) for layer in self.layers]

    def __calc_output_diff(self, x_idx: int) -> np.ndarray:
        """
        :param x_idx:  the index of the current input on the X_train
        """
        y_train = self.expected_output[x_idx]  # get the expected output of the x
        output = self.prediction[x_idx]  # get the prediction
        return np.array([y - p for y, p in zip(y_train, output)])

    def __calc_act_function_derivative(self, act_func: str, y: list, target=None) -> np.ndarray:
        """
        :param y:  y is the output in a layer

        :return : a 1D array which is the sigmoid gradient of the neurons in a layer
        """
        if act_func == 'sigmoid':
            return np.array([x * (1-x) for x in y])

        elif act_func == 'relu':
            return np.array([1 if x > 0 else 0 for x in y])

        elif act_func == 'linear':
            return np.array([1 for _ in y])

        elif act_func == 'softmax':
            if target is None:
                raise ValueError("Target is required for softmax gradient")
            return np.array([-1 * (1-y[i]) if target == i else y[i] for i in range(len(y))])

        else:
            raise ValueError(f"Unknown activation function: {act_func}")


    def __calc_output_layer_delta(self, x_idx: int) -> np.ndarray:
        """
        :param x_idx:  the index of the current input on the X_train
        """
        # get the activation function for the last layer (output layer)
        act_func = self.layers[-1].activation_function  # get the activation function
        if act_func == 'softmax':
            return self.__calc_output_diff(x_idx)
        return self.__calc_act_function_derivative(act_func, self.prediction[x_idx]) * self.__calc_output_diff(x_idx)

    def __calc_hidden_layer_delta(self, batch_idx, layer_idx: int, output_error_term: np.ndarray) -> np.ndarray:
        """
        :param output_error_term: a 1D array of the error term of each weight calculated from the layer after
        :param layer_idx: the index of the current layer
        :param batch_idx: the index of the current batch

        hidden layer gradient = net gradient of the neuron values of current layer * the sum of weight * output error term
        """
        act_func = self.layers[layer_idx].activation_function
        activation_func_derivative = self.__calc_act_function_derivative(act_func,
                                                                         self.neuron_values[layer_idx][batch_idx])

        sum_d_net = [x[0] for x in np.matmul(self.weights[layer_idx + 1], output_error_term)]
        return np.array(activation_func_derivative
                        * sum_d_net)

    def __get_curr_batch_size(self, batch_idx):
        mod_res = len(self.X_train) % self.batch_size
        if batch_idx == self.batch_size - 1 and mod_res != 0:
            return mod_res
        return self.batch_size

    def __print_final_weights(self):
        print("========= EXPECTED =========")
        for weight in self.expected_weights:
            print("[")
            for neuron_weight in weight:
                print("  ", neuron_weight)
            print("], ")
        print("STOPPED BY: ", self.expected_stopped_by)

        print("========== ACTUAL ==========")

        for i in range(len(self.weights)):
            print("[")
            print("  ", self.bias_weights[i])
            for neuron_weight in self.weights[i]:
                print("  ", neuron_weight)
            print("], ")
        print("STOPPED BY: ", self.stopped_by)

## Main Program

Dijalankan untuk melakukan pengujian berdasarkan test case yang diberikan. Test case diuji dengan menuliskan path dari file test case pada input.

In [3]:
import json

file_path = input("Enter json file path: ")
f = open(file_path)
data = json.load(f)

try:
    data_layers = data["case"]["model"]["layers"]
    layers = []
    for layer in data_layers:
        activation_func = layer["activation_function"]
        if activation_func not in ["linear", "relu", "sigmoid", "softmax"]:
            raise Exception("Activation function " + activation_func + " not available")
        layers.append(FFNNLayer(layer["number_of_neurons"], activation_func))

    weights = data["case"]["initial_weights"]
    input_size = data["case"]["model"]["input_size"]
    X_train = data["case"]["input"]
    y_train = data["case"]["target"]
    learning_rate = data["case"]["learning_parameters"]["learning_rate"]
    batch_size = data["case"]["learning_parameters"]["batch_size"]
    max_iteration = data["case"]["learning_parameters"]["max_iteration"]
    error_threshold = data["case"]["learning_parameters"]["error_threshold"]

    expected_weights = data["expect"]["final_weights"]
    expected_stopped_by = data["expect"]["stopped_by"]

    model = MLPClassifier(layers, learning_rate, error_threshold, max_iteration, batch_size, weights, expected_stopped_by, expected_weights)

    model.fit(X_train, y_train)

    sse = model.calculate_sse()
    print(f"Sum Squared Error: {sse:.4f}")
    if sse < 1e-7:
        print("Sum Squared Error(SSE) of prediction is lower than Maximum SSE")
    else:
        print("Sum Squared Error(SSE) of prediction surpass the Maximum SSE")
except KeyError as ke:
    print('Key', ke, "not found in json data. Please check your json data format")
except Exception as error:
    print("An exception occurred: ", error)

[
   [-0.28730211, -0.28822282, -0.70597451, 0.42094471]
   [-0.5790794, -1.1836444, -1.34287961, 0.69575311]
   [-0.41434377, 1.51314676, -0.97649086, -1.3043465]
], 
[
   [-1.72078607, 1.74078607]
   [-0.50352956, 0.48352956]
   [1.25764816, -1.23764816]
   [-1.16998784, 1.14998784]
   [1.0907634, -1.0707634]
], 
STOPPED BY:  error_threshold
[
   [-0.28730211 -0.28822282 -0.70597451  0.42094471]
   [-0.5790794  -1.1836444  -1.34287961  0.69575311]
   [-0.41434377  1.51314676 -0.97649086 -1.3043465 ]
], 
[
   [-1.72078607  1.74078607]
   [-0.50352956  0.48352956]
   [ 1.25764816 -1.23764816]
   [-1.16998784  1.14998784]
   [ 1.0907634 -1.0707634]
], 
STOPPED BY:  error_threshold
Sum Squared Error: 0.0000
Sum Squared Error(SSE) of prediction is lower than Maximum SSE


## Pengujian pada Dataset Iris

Pada dataset iris, perlu dilakukan preprocessing sehingga dibuat kelas berikut

In [4]:
import pandas as pd
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.model_selection import train_test_split


class DataPreprocessor:
    def __init__(self, file_path: str = None):
        self.data = None
        self.encoder = LabelEncoder()
        self.scaler = StandardScaler()

        if file_path is not None:
            self.load_data(file_path)

    def load_data(self, file_path: str):
        self.data = pd.read_csv(file_path)

    def preprocess(self, target_column: str):
        """
        Preprocesses the data by splitting it, standardizing the features, and encoding the labels.

        :param target_column: The name of the column to use as the target (label).

        :return: The standardized features and encoded labels for the training and testing data.
        """
        X_train, X_test, y_train, y_test = self.__split_data(0.2, target_column)
        X_train_standardized, X_test_standardized = self.__standardize_data(X_train, X_test)
        y_train_encoded, y_test_encoded = self.__encode_data(y_train, y_test)

        return X_train_standardized, X_test_standardized, y_train_encoded, y_test_encoded

    def __split_data(self, test_size: float = 0.2, stratify_column_label: str = None):
        """
        Splits the data into training and testing sets.

        :param test_size: The proportion of the dataset to include in the test split.
        :param stratify_column_label: The name of the column to use for stratification.

        :return: The training and testing data and labels.
        """
        X = self.data.drop(columns=[stratify_column_label])
        y = self.data[stratify_column_label]

        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=test_size,
                                                            stratify=y if stratify_column_label else None)

        return X_train, X_test, y_train, y_test

    def __standardize_data(self, X_train, X_test):
        """
        Standardizes the features in the training and testing data.

        :param X_train: The training data.
        :param X_test: The testing data.

        :return: The standardized training and testing data.
        """
        X_train_standardized = self.scaler.fit_transform(X_train)
        X_test_standardized = self.scaler.transform(X_test)

        return X_train_standardized, X_test_standardized

    def __encode_data(self, y_train, y_test):
        """
        Encodes the labels in the training and testing data.

        :param y_train: The labels for the training data.
        :param y_test: The labels for the testing data.

        :return: The encoded labels for the training and testing data.
        """
        y_train_encoded = self.encoder.fit_transform(y_train)
        y_test_encoded = self.encoder.transform(y_test)

        return y_train_encoded, y_test_encoded

    def decode_labels(self, y_encoded: list):
        """
        Decodes the encoded labels back to the original categorical labels.

        :param y_encoded: The encoded labels.

        :return: The original categorical labels.
        """
        y_original = self.encoder.inverse_transform(y_encoded)
        return y_original

Akan dilakukan pengujian pada dataset iris dengan parameter sebagai berikut:
* Struktur jaringan: 2 hidden layer dengan masing-masing 3 neuron dan fungsi aktivasi ReLU
* Initial weights: akan diinitialize secara random dengan nilai dalam interval -0.5 - 0.5
* learning_rate: 0.1
* error_threshold: 0.0001
* max_iter: 100
* batch_size: 150

### Preprocessing data

In [5]:
# Load the Iris dataset 
preprocessor = DataPreprocessor("test_cases_mlp/iris.csv")

# Split the data into training and test sets 
X_train, X_test, y_train, y_test = preprocessor.preprocess("Species")

### Pengujian pada kelas implementasi MLPClassifier

### Pengujian menggunakan library scikit-learn