In [1]:
# Imports
import numpy as np
import pandas as pd
from sklearn.model_selection    import train_test_split
from sklearn.preprocessing      import LabelEncoder
from sklearn.metrics            import accuracy_score
from joblib                     import dump, load
from datetime                   import datetime
from Utils.ActivationFunction   import ActivationFunction

In [None]:
class NeuralNetwork:
    def __init__(self, input_data, learning_rate, err_threshold, max_iter, batch_size, debug = True):
        self.input_data = input_data
        self.layers = []
        self.learning_rate = learning_rate
        self.err_threshold = err_threshold
        self.max_iter = max_iter
        self.batch_size = batch_size
        self.debug = debug

    def add_layer(self, n_inputs: int, n_neurons: int, activ_func):
        self.layers.append(NeuralLayer(n_inputs, n_neurons, activ_func))

    def forward_propagate(self, inputs):
        raise Exception("Haven't checked this again")
        for i, layer in enumerate(self.layers):
            inputs = layer.feed_forward(inputs)
            if self.debug:
                print(f'Layer {i+1}, Output: {inputs}')
        return inputs
    
    def backward_propagate(self, target_output):
        raise Exception("TODO: Check this please, kyknya masih salah")
        # Error term for each output unit j
        # Delta j = oj (1 - oj) (tj - oj)
        for i, output_neuron in enumerate(self.layers[-1].neurons):
            error = target_output[i] - output_neuron.output
            output_neuron.calculate_delta(error)
        
        # Error term for each output unit l
        # Delta h = oh (1 - oh) sig(wlh * delta l)
        for l in range(len(self.layers) -2, -1, -1):
            layer = self.layers[l]
            next_layer = self.layers[l+1]

            for j, neuron in enumerate(layer.neurons):
                error = sum([next_neuron.weights[j] * next_neuron.delta for next_neuron in next_layer.neurons])
                neuron.calculate_delta(error)
    
    def calculate_total_error(self, outputs, targets):
        raise Exception("Haven't checked this again")
        mse = np.mean([(t - o)**2 for t, o in zip(targets, outputs)])
        return mse

    def update_weights(self):
        for layer in self.layers:
            layer.update_weights(self.learning_rate)
    
    def predict(self, input_data):
        return self.forward_propagate(input_data)

    def train(self):
        for itr in range(self.max_iter):
            total_error = 0
            
            for i in range(0, len(self.input_data), self.batch_size):
                batch = self.input_data[i:i+self.batch_size]
                inputs, targets = zip(*batch)
                
                outputs = self.forward_propagate(inputs)
                self.backward_propagate(targets)
                self.update_weights()

                batch_error = self.calculate_total_error(outputs, targets)
                total_error += batch_error
            
            # Ambil nilai avg dari total per batch
            total_error /= len(self.input_data) / self.batch_size

            if self.debug:
                print(f"Epoch {itr+1}/{self.max_iter}, Total Error: {total_error}")
            
            # Stop training kalau err_threshold tercapai
            if total_error <= self.err_threshold:
                print(f"Training complete at epoch {itr+1} with total error {total_error} below threshold {self.err_threshold}.")
                break

class NeuralLayer:
    __counter = 0
    def __init__(self, n_inputs, n_neurons, activ_func):
        NeuralLayer.__counter += 1
        self.__neurons = [Neuron(n_inputs, activ_func) for _ in range(n_neurons)]
        self.__counter = NeuralLayer.__counter

    @property
    def neurons(self):
        return self.__neurons

    @property
    def deltas(self):
        return [i.delta for i in self.neurons]

    def feed_forward(self, inputs):
        return [neuron.calculate_output(inputs) for neuron in self.neurons]

    def update_weights(self, learning_rate):
        for neuron in self.neurons:
            neuron.update_weights(learning_rate)

    def __str__(self):
        return f'---> Layer {self.__counter}, # of neurons: {len(self.neurons)}'

class Neuron:
    def __init__(self, n_weights: int, activ_func, bias: float = 1.0):
        # Weight initialization? I use random here
        self.__weights = np.random.rand(n_weights)
        self.__bias = bias
        self.__activation = activ_func
        self.__output = 0
        self.__inputs = []
        self.__delta = 0
        self.__n_weights = n_weights

    @property
    def output(self):
        return self.__output
    
    @property
    def delta(self):
        return self.__delta

    @property
    def weights(self):
        return self.__weights

    def calculate_output(self, inputs):
        # Kalau pakai bias in each neuron
        # weighted_sum = np.dot(inputs, self.__weights) + self.__bias
        weighted_sum = np.dot(inputs, self.__weights)
        self.__output = self.__activation(weighted_sum)
        self.__inputs = inputs
        return self.__output

    def calculate_delta(self, error):
        self.__delta = error * self.__activation.dfunc(self.__output)

    def update_weights(self, learning_rate):
        for i in range(self.__n_weights):
            self.__weights[i] += learning_rate * self.__delta * self.__inputs[i]
        # self.__bias += learning_rate * self.__delta
    
    def __str__(self):
        return '--- weights = {}, bias = {}'.format(self.__weights, self.__bias)

Dataset Preprocessing

In [None]:
# Loading the dataset
data = pd.read_csv('Datasets/iris.csv')

# Encode target label to integer
le = LabelEncoder()
data['Species'] = le.fit_transform(data['Species'])

# Remove id and species column and get species column data
X = data.drop(['Id', 'Species'], axis=1)
y = data['Species']

# Splitting Training and Validation
X_train, X_val, y_train, y_val = train_test_split(X, y, test_size=0.2, random_state=42)

print(f"Training set shape: {X_train.shape}")
print(f"Validation set shape: {X_val.shape}")

Model Training and Saving

In [None]:
# Initialize NeuralNetwork class
nn = NeuralNetwork(input_data=X_train, learning_rate=0.01, err_threshold=0.01, max_iter=1000, batch_size=32, debug=True)

# Adding the layers
nn.add_layer(n_inputs=2, n_neurons=3, activ_func=ActivationFunction(types='Linear'))
nn.add_layer(n_inputs=3, n_neurons=1, activ_func=ActivationFunction(types='Sigmoid'))

# Train the NeuralNetwork
nn.train()

# Dump the model
timestamp = datetime.now().strftime('%Y-%m-%d_%H-%M-%S')
file_name = f'Models/NeuralNetwork_{timestamp}.joblib'
dump(nn, file_name)
print(f"Model saved successfully as {file_name}.")

Prediction Results

In [None]:
# Make prediction based on the trained model
y_pred = nn.predict()
y_pred_labels = le.inverse_transform(y_pred)

# Calculate accuracy
accuracy = accuracy_score(y_val, y_pred)
print(f"Validation Accuracy: {accuracy}")