In [256]:
import sys  
import time  
import math  
import uuid  
import numpy as np  
import matplotlib.pyplot as plt
import pickle
import plotly.graph_objs as go
from numpy import errstate  
from sklearn.datasets import fetch_openml  
from sklearn.model_selection import train_test_split  
from sklearn.preprocessing import OneHotEncoder  

In [257]:
sys.setrecursionlimit(10000)  

In [258]:
class VarValue:
    def __init__(self, value, children=(), varname=''):
        self.varname = varname
        self.value = value
        self.derivative_to = {}
        self.children = children

    def __chain_rule(self, dSelfdx, child):
        if(child.varname[:5] != 'const'):
            for grandchild_varname in child.derivative_to:
                if(grandchild_varname != 'const'):
                    if(grandchild_varname in self.derivative_to):

                        self.derivative_to[grandchild_varname] += dSelfdx * child.derivative_to[grandchild_varname]
                    else:
                        self.derivative_to[grandchild_varname] = dSelfdx * child.derivative_to[grandchild_varname]
            if(len(self.derivative_to) == 0):
              raise ValueError(self.varname, child.varname)

    def relu(self):
        out = VarValue(max(0,self.value), children=(self,), varname='out_relu_'+str(uuid.uuid4()))
        if(self.varname[:5] != 'const'):
            dodx = 0 if self.value <= 0 else 1
            if(len(self.children) == 0):
                out.derivative_to[self.varname] = dodx
            else:
                out.__chain_rule(dodx, self)
        return out

    def ln(self):
        out = VarValue(math.log(self.value), children=(self,), varname='out_ln_'+str(uuid.uuid4()))
        if(self.varname[:5] != 'const'):
            dodx = 1/(self.value)
            if(len(self.children) == 0):
                out.derivative_to[self.varname] = dodx
            else:
                out.__chain_rule(dodx, self)
        return out

    def __mul__(self, other):
        other = other if isinstance(other, VarValue) else VarValue(other, varname='const'+str(uuid.uuid4()))
        out = VarValue(self.value * other.value, children=(self, other), varname='out_mul_'+str(uuid.uuid4()))

        if(self.varname[:5] != 'const'):
            dodx1 = other.value
            if(len(self.children) == 0):
                out.derivative_to[self.varname] = dodx1
            else:
                out.__chain_rule(dodx1, self)
        if(other.varname[:5] != 'const'):
            dodx2 = self.value
            if(len(other.children) == 0):
                out.derivative_to[other.varname] = dodx2
            else:
                out.__chain_rule(dodx2, other)
        return out

    def __add__(self, other):
        other = other if isinstance(other, VarValue) else VarValue(other, varname='const'+str(uuid.uuid4()))
        out = VarValue(self.value + other.value, children=(self, other), varname='out_add_'+str(uuid.uuid4()))
        if(self.varname[:5] != 'const'):
            dodx1 = 1
            if(len(self.children) == 0):
                out.derivative_to[self.varname] = dodx1
            else:
                out.__chain_rule(dodx1, self)
        if(other.varname[:5] != 'const'):
            dodx2 = 1
            if(len(other.children) == 0):
                out.derivative_to[other.varname] = dodx2
            else:
                out.__chain_rule(dodx2, other)
        return out

    def __sub__(self, other):
        return self + (-other)

    def __truediv__(self, other):
        return self * other**-1

    def __neg__(self):
        return self * -1

    def __pow__(self, other):
        other = other if isinstance(other, VarValue) else VarValue(other, varname='const'+str(uuid.uuid4()))
        try:
            with errstate(over='raise', invalid='raise'):
                result = float(self.value) ** float(other.value)
        except (OverflowError, FloatingPointError):
            if abs(float(self.value)) > 1:
                result = math.inf
            else:
                result = 0

        out = VarValue(result, children=(self, other), varname='out_pow_'+str(uuid.uuid4()))

        if(self.varname[:5] != 'const'):
            dodx = other.value * self.value**(other.value-1)
            if(len(self.children) == 0):
                out.derivative_to[self.varname] = dodx
            else:
                out.__chain_rule(dodx, self)
        return out

    def __rmul__(self, other):
        return self * other

    def __radd__(self, other):
        return self + other

    def __rsub__(self, other):
        return other + (-self)

    def __rpow__(self, other):
        other = other if isinstance(other, VarValue) else VarValue(other, varname='const'+str(uuid.uuid4()))
        out = VarValue(other.value**self.value, children=(self, other), varname='out_rpow_'+str(uuid.uuid4()))
        if self.varname:
            dodx = other.value**self.value * math.log(other.value)
            if(len(self.children) == 0):
                out.derivative_to[self.varname] = dodx
            else:
                out.__chain_rule(dodx, self)
        return out

    def __rtruediv__(self, other):
        return other * self**-1

    # Equality
    def __eq__(self, other):
        if isinstance(other, VarValue):
            return self.varname == other.varname
        return self.varname == other

    # Inequality
    def __ne__(self, other):
        return not self.__eq__(other)

    # Less than
    def __lt__(self, other):
        if isinstance(other, VarValue):
            return self.value < other.value
        return self.value < other

    # Less than or equal
    def __le__(self, other):
        if isinstance(other, VarValue):
            return self.value <= other.value
        return self.value <= other

    # Greater than
    def __gt__(self, other):
        if isinstance(other, VarValue):
            return self.value > other.value
        return self.value > other

    # Greater than or equal
    def __ge__(self, other):
        if isinstance(other, VarValue):
            return self.value >= other.value
        return self.value >= other

In [259]:
class Layer:
    def __init__(self,n_neurons=3, init='zero', activation='relu'):
        self.n_neurons = n_neurons
        self.current_input_batch = None
        self.init = init    # zero/uniform/normal - harusnya gaperlu disini, ini di layer langsung harusnya
        self.weights = None
        self.biases = None
        self.activation = activation    # linear/relu/sigmoid/tanh/softmax/binary_step/leaky_relu/prelu/elu/swish(ini)/gelu(ini)
        self.learning_rate = None

        self.net = None
        self.out = None

    def __update_weights_dEdW(self, dEdW):
        self.weights -= self.learning_rate*dEdW

    def __update_biases_dEdB(self, dEdB):
        self.biases -= self.learning_rate*dEdB

    def __update_weights_err_term(self, err_term):
        for input in self.current_input_batch:
            for i in self.weights:
                for j in i:
                    self.weights += self.learning_rate*err_term[j]*self.input[i]

    def __update_biases_err_term(self, err_term):
        for i in self.weights:
            for j in i:
                self.weights += self.learning_rate*err_term[j]*1

    def forward(self, current_input_batch):
        self.current_input_batch = current_input_batch

        if(self.weights is None):
            if(self.init == 'zero'):
                self.weights = np.array([[VarValue(0,varname='w_'+str(uuid.uuid4())) for _ in range(self.n_neurons)] for _ in range(len(self.current_input_batch[0]))])
                self.biases = np.array([VarValue(0,varname='b_'+str(uuid.uuid4())) for _ in range(self.n_neurons)])
            elif(self.init == 'uniform'):
                # TODO: Initialization uniform
                pass
            elif(self.init == 'normal'):
                # TODO: Initialization normal
                pass

        self.net = (np.dot(self.current_input_batch, self.weights)) + self.biases

        if(self.activation == 'linear'):
            self.out = self.net

        elif(self.activation == 'relu'):
            self.out = np.array([[net.relu() for net in row] for row in self.net])

        elif(self.activation == 'sigmoid'):
            for i in self.net:
                for j in i:
                    j.value = np.clip(j.value, -500, 500)
            self.out = 1 / (1 + (math.exp(1))**(-self.net))

        elif(self.activation == 'tanh'):
            self.out = (math.exp(1)**self.net - math.exp(1)**(-self.net))/(math.exp(1)**self.net + math.exp(1)**(-self.net))

    def backward(self, sum_err_term_k_w=None, err=None, method='instant_deriv'):
        
        if(method == 'instant_deriv'):

            dEdW = np.array([[ err.derivative_to[neuron_w.varname] for neuron_w in weights] for weights in self.weights])
            dEdB = np.array([ err.derivative_to[bias.varname] for bias in self.biases])

            self.__update_weights_dEdW(dEdW)
            self.__update_biases_dEdB(dEdB)

        else:
            # Versi pake Error Term - Not Tested Yet
            if(err):
                # Output Layer
                err_term = np.array([err.derivative_to[net.varname] for net in self.net])
            else:
                # Hidden Layer
                err_term = sum_err_term_k_w
                pass
            sum_err_term_w_current = self.weights @ err_term.T
            self.__update_weights_err_term(err_term)
            self.__update_biases_err_term(err_term)
            return sum_err_term_w_current,

    def clean_derivative(self):
        for input in self.current_input_batch:
            for x in input:
                x.derivative_to.clear()
                x.children = ()

        for i in self.weights:
            for j in i:
                j.derivative_to.clear()
                j.children = ()

        for b in self.biases:
            b.derivative_to ={}
            b.children = ()

        for i in self.net:
            for j in i:
                j.derivative_to.clear()
                j.children = ()

        for i in self.out:
            for j in i:
                j.derivative_to.clear()
                j.children = ()

In [None]:
class FFNN:
    def __init__(self, loss='mse', batch_size=1, learning_rate=0.01, epochs=20, verbose=0):
        self.loss = loss    # mse/bce/cce
        self.batch_size=batch_size
        self.learning_rate=learning_rate
        self.epochs=epochs
        self.verbose=verbose
        self.layers = None
        self.weights = []
        self.bias = []
        self.x = None
        self.y = None
        self.onehot_encoder = OneHotEncoder(categories='auto')

    def __loss(self, out, target):
        if(out.shape != target.shape):
            print("Output shape: ", out.shape)
            print("Target shape: ", target.shape)
            raise ValueError("Shape not match")
        if self.loss == 'mse':
            mse = (1/target.shape[1])*np.square(target - out)
            return np.sum(mse)
        elif self.loss == 'cce':
            epsilon = 1e-15
            out = np.clip(out, epsilon, 1 - epsilon)
            return -np.sum(target * np.log(out)) / self.batch_size

    def build_layers(self, *layers: Layer):
        self.layers = layers
        for layer in self.layers:
            layer.learning_rate = self.learning_rate

    def fit(self, x, y):
        self.x = x
        self.y = self.onehot_encoder.fit_transform(y.reshape(-1, 1))
        total_batch = (len(x)+self.batch_size-1)//self.batch_size
        start_global = time.time()
        for epoch in range(self.epochs):
            print(f"Epoch {epoch+1}")
            start = time.time()
            count_batch = 1
            for i in range(total_batch):
                x_batch = self.x[i*self.batch_size:(i+1)*self.batch_size] if ((i+1) < total_batch) else self.x[i*self.batch_size:]
                y_batch = self.y[i*self.batch_size:(i+1)*self.batch_size] if ((i+1) < total_batch) else self.y[i*self.batch_size:]
                batch_input = x_batch
                for layer in self.layers:
                    layer.forward(batch_input)
                    batch_input = layer.out
                out = batch_input
                error = self.__loss(out, y_batch)
                count = 1

                for layer in self.layers[::-1]:
                    layer.backward(err=error)
                    count += 1
                count_batch += 1

                # Ini gaperlu harusnya gapapa
                for layer in self.layers:
                    layer.clean_derivative()

            end = time.time()
            print("Dur", end-start)

        end_global = time.time()
        print("Dur Global", end_global-start_global)

    def predict(self, x_predict):
        batch_input = x_predict
        for layer in self.layers:
            layer.forward(batch_input)
            batch_input = layer.out
        out = batch_input
        return out

    def visualize(self):
        num_layers = len(self.layers)
        
        # Color palette
        layer_colors = {
            0: 'yellow',    # Input layer
            -1: 'salmon',   # Output layer
            'hidden': 'lightblue'  # Hidden layers
        }
        
        nodes_x = []
        nodes_y = []
        node_colors = []
        node_texts = []
        
        edges_x = []
        edges_y = []
        edge_texts = []
        
        for layer_idx, layer in enumerate(self.layers):
            # Atur2 warnanya wkwk
            if num_layers == 1:
                color = layer_colors[-1]
            elif layer_idx == 0:
                color = layer_colors[0]
            elif layer_idx == num_layers - 1:
                color = layer_colors[-1]
            else:
                color = layer_colors['hidden']
            
            n_neurons = layer.n_neurons
            y_positions = np.linspace(0, 1, n_neurons)
            x_pos = layer_idx / (num_layers - 1) if num_layers > 1 else 0.5
            
            for neuron_idx, y_pos in enumerate(y_positions):
                nodes_x.append(x_pos)
                nodes_y.append(y_pos)
                node_colors.append(color)
                node_texts.append(f"Layer {layer_idx}, Neuron {neuron_idx}<br>Activation: {layer.activation}")
            
            if layer_idx < num_layers - 1:
                next_layer = self.layers[layer_idx + 1]
                next_x = (layer_idx + 1) / (num_layers - 1) if num_layers > 1 else 0.5
                next_n_neurons = next_layer.n_neurons
                next_y_positions = np.linspace(0, 1, next_n_neurons)
                
                for curr_neuron_idx, curr_y in enumerate(y_positions):
                    for next_neuron_idx, next_y in enumerate(next_y_positions):
 
                        edges_x.extend([x_pos, next_x, None])
                        edges_y.extend([curr_y, next_y, None])
                        
                        weight_text = "Weight: Not initialized"
                        if layer.weights is not None:
                            try:
                                weight = layer.weights[curr_neuron_idx][next_neuron_idx]
                                weight_text = f"Weight: {weight}"
                            except:
                                weight_text = "Weight: Unavailable"
                        
                        edge_texts.append(weight_text)
        
        # Create nodes
        node_trace = go.Scatter(
            x=nodes_x, y=nodes_y,
            mode='markers',
            hoverinfo='text',
            marker=dict(
                showscale=False,
                color=node_colors,
                size=15,
                line_width=2
            ),
            text=node_texts
        )
        
        # Create edges
        edge_trace = go.Scatter(
            x=edges_x, y=edges_y,
            mode='lines',
            line=dict(width=0.5, color='#888'),
            hoverinfo='text',
            text=edge_texts
        )
        
        fig = go.Figure(data=[edge_trace, node_trace],
                        layout=go.Layout(
                            title='Neural Network Architecture',
                            showlegend=False,
                            hovermode='closest',
                            margin=dict(b=0,l=0,r=0,t=40),
                            xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),
                            yaxis=dict(showgrid=False, zeroline=False, showticklabels=False)
                        ))
        
        fig.update_layout(plot_bgcolor='rgba(255,255,255,255)')
        fig.show()

    
    def save(self, filename):
        model_data = {
            'loss': self.loss,
            'batch_size': self.batch_size,
            'learning_rate': self.learning_rate,
            'epochs': self.epochs,
            'layers': self.layers
        }
        with open(filename, 'wb') as f:
            pickle.dump(model_data, f)
        print(f"Model saved to {filename}")

    def load(self, filename):
        with open(filename, 'rb') as f:
            model_data = pickle.load(f)
        self.loss = model_data['loss']
        self.batch_size = model_data['batch_size']
        self.learning_rate = model_data['learning_rate']
        self.epochs = model_data['epochs']
        self.layers = model_data['layers']
        print(f"Model loaded from {filename}")


In [261]:
X, y = fetch_openml("mnist_784", version=1, return_X_y=True, as_frame=False)
y = y.astype(np.uint8)

In [262]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y,
    test_size=65500,
    train_size=10,
    random_state=42,
    stratify=y
)

In [263]:
X_train = X_train / 255.0
X_test = X_test / 255.0

In [264]:
X_train = np.array([[VarValue(x, varname='x_'+str(uuid.uuid4())) for x in row] for row in X_train])

In [265]:
model_ffnn = FFNN(
    loss='mse',
    batch_size=1,
    learning_rate=0.1,
    epochs=2,
    verbose=1,
)

In [266]:
model_ffnn.build_layers(
    Layer(n_neurons=4, init='zero', activation='linear'),
    Layer(n_neurons=3, init='zero', activation='linear'),
    Layer(n_neurons=2, init='zero', activation='linear'),
    Layer(n_neurons=10, init='zero', activation='linear')
)

In [267]:
model_ffnn.fit(X_train, y_train)

Epoch 1
Dur 8.118615865707397
Epoch 2
Dur 6.587096214294434
Dur Global 14.705712080001831


In [268]:
model_ffnn.visualize()

In [269]:
for layer in model_ffnn.layers:
    print("=====Layer======")
    for i in layer.weights:
        print("==Neuron==")
        for j in i:
            print(j.varname)

==Neuron==
out_add_a51120db-a086-4352-92c7-6b4289bed873
out_add_54efdf76-becd-4eab-a11d-66cf139fe457
out_add_79983d27-6f61-4f02-97a9-6f65946bf260
out_add_45c17716-5222-46c9-86a4-b114c2668ff9
==Neuron==
out_add_3c06c70e-9420-4c66-aaa0-d9160f58de5f
out_add_552d11d8-1c40-44c4-90af-8823d3b50f34
out_add_320b2834-865d-4ab2-8d05-cf93d3c00f1b
out_add_ace9f701-34aa-4353-873a-d4277341fd37
==Neuron==
out_add_82baf5cd-38bf-46d0-85fa-7f08c4b6f6d6
out_add_646165a9-cd4c-4e08-9e58-bb4b1ec9b3b8
out_add_0e4075db-4544-41f4-8940-53c3fce34e4e
out_add_e95df38b-ad17-4a0e-a80c-4a75962aab28
==Neuron==
out_add_1d137a3a-eca5-4ac2-98e7-45328eacec8e
out_add_ca330fb3-1191-482b-b897-29be26cc369f
out_add_53d2d07f-2e58-4119-85b2-26fb29197508
out_add_d471addd-228e-483c-add2-fec2cc347deb
==Neuron==
out_add_ee925353-bf35-4557-a31f-2cb844d2bb1a
out_add_ff43ae74-4d1d-481e-85d4-9bb171d67df7
out_add_bbf14e3f-f448-4f31-8fea-8c711095a842
out_add_064fe152-4bd7-4e64-bb2a-ba2a35f01d1d
==Neuron==
out_add_1701ae66-18fa-4275-b24c-ba

In [270]:
for layer in model_ffnn.layers:
    for i in layer.weights:
        for j in i:
            print(j.value)

0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0


In [271]:
y_pred = model_ffnn.predict(X_test[:10])
for h in y_pred:
    max_index = np.argmax(h)
    print(max_index)

4
4
4
4
4
4
4
4
4
4


In [272]:
model_ffnn.y.shape

(10, 10)

In [273]:
for h in y_test[:10].flatten():
    print(h)

9
6
5
9
6
9
8
8
1
2
