In [None]:
neurons = [4,4,3]
dataset = "iris" # "iris", "autoasocjacja"
starting_bias = 0

# ile procent danych ma byc w zbiorze uczacym
# ignorowane dla autoasocjacji
dataset_split_ratio = 0.5 
lr = 0.1
momentum = 0.0
epochs = 2000
target_error = None # float or None
shuffle = True
input_weights_filename = "weights.npy"

In [None]:
%matplotlib widget
import matplotlib.pyplot as plt
import numpy as np
import random
from sklearn.metrics import confusion_matrix, classification_report
import pandas as pd
import json
import os


output_dir = f"{dataset}"
output_dir += f"_sbias{starting_bias}"
if dataset != "autoasocjacja":
    output_dir += f"_split{int(dataset_split_ratio*100)}"
output_dir += f"_lr{lr}"
output_dir += f"_m{momentum}"
output_dir += f"_e{epochs}"
output_dir += f"_et{target_error}"
output_dir += f"_shuffle{int(shuffle)}"
if os.path.exists(output_dir):
    os.rmdir(output_dir) 
os.mkdir(output_dir)

In [None]:
weights = None
try:
    weights = np.load(input_weights_filename+".npy")
except:
    weights = np.zeros((len(neurons)-1, max(neurons), max(neurons)+1))
    for i in range(len(weights)):
        layer = np.random.rand(neurons[i+1], neurons[i]+1)
        layer[:,0] = starting_bias # bias for that layer
        layer.resize(max(neurons), max(neurons)+1)
        weights[i] = layer
weights

In [None]:
training_data = None
testing_data = None
iris = None
if dataset == "iris":
    from sklearn import datasets, preprocessing
    iris = datasets.load_iris()
    data = preprocessing.normalize(iris.data)
    one_hot_targets = np.zeros((len(data), np.max(iris.target)+1))
    for i in range(len(data)):
        one_hot_targets[i,iris.target[i]] = 1
    data = [row for row in data]
    targets = [row for row in one_hot_targets]
    whole_data = list(zip(data, targets))
    if shuffle:
        random.shuffle(whole_data)
    training_data = whole_data[:int(len(whole_data)*dataset_split_ratio)]
    testing_data = whole_data[int(len(whole_data)*dataset_split_ratio):]
elif dataset == "autoasocjacja":
    training_data = [
        (np.array([1,0,0,0]), np.array([1,0,0,0])),
        (np.array([0,1,0,0]), np.array([0,1,0,0])),
        (np.array([0,0,1,0]), np.array([0,0,1,0])),
        (np.array([0,0,0,1]), np.array([0,0,0,1])),
    ]
    testing_data = [
        (np.array([1,0,0,0]), np.array([1,0,0,0])),
        (np.array([0,1,0,0]), np.array([0,1,0,0])),
        (np.array([0,0,1,0]), np.array([0,0,1,0])),
        (np.array([0,0,0,1]), np.array([0,0,0,1])),
    ]
else:
    raise ValueError("Unknown dataset")

In [None]:
def activation(x):
    return (1 / (1+np.pow(np.e, -x)))
def activation_derivative(x):
    return x * (1 - x)

In [None]:
# run inference
def inference(weights, data, inputs=None, outputs=None):
    sample = np.copy(data)
    sample.resize(max(neurons))

    # add 1 at the beginning so bias properly applies 
    sample = np.pad(sample, (1,0), "constant", constant_values=(1))
    for layer in weights:
        if inputs is not None:
            inputs.append(np.copy(sample))
        sample = layer @ sample
        sample = activation(sample)
        if outputs is not None:
            outputs.append(np.copy(sample))
        sample = np.pad(sample, (1,0), "constant", constant_values=(1))
    return sample[1:(neurons[-1]+1)] # strip all the useless padding

In [None]:
def calc_deltas(output, target, weights, activations):
    delta = (output - target) * activation_derivative(output)
    delta.resize(max(neurons))
    deltas = [delta]

    # Backward pass
    for i in range(len(weights)-1, 0, -1):
        act = activations[i-1][0:neurons[i]]
        w = weights[i][:,1:neurons[i]+1]
        delta = (w.T @ deltas[0]) * activation_derivative(act)
        delta.resize(max(neurons))
        deltas.insert(0, delta)
    return deltas

In [None]:
def backprop(weights, sample, target, lr=0.1, momentum=0.0, prev_updates=None):
    # Forward pass
    activations = []
    inputs = []
    output = inference(weights, sample, inputs, activations)
    # Calculate output error
    error = np.mean((output - target) ** 2)

    deltas = calc_deltas(output, target, weights, activations)

    # Initialize previous updates if needed
    if prev_updates is None:
        prev_updates = [np.zeros_like(w) for w in weights]

    # Update weights with momentum
    new_prev_updates = []
    for i in range(len(weights)):
        inp = inputs[i][np.newaxis, :]
        delt = deltas[i][:, np.newaxis]
        update = lr * delt @ inp
        weights[i][:, :inp.shape[1]] -= update + momentum * prev_updates[i][:, :inp.shape[1]]
        # Store this update for next iteration
        prev_update = update
        
        if starting_bias == 0:
            # Remove bias from the updates and weights
            prev_update[:, 0] = 0
            weights[i][:, 0] = 0

        new_prev_updates.append(prev_update)

    return (weights, error, new_prev_updates)

In [None]:
errors = []

In [None]:
# Training loop
prev_updates = None
for epoch in range(epochs):
    epochError = []
    if shuffle:
        random.shuffle(training_data)
    for sample, target in training_data:
        (weights, error, prev_updates) = backprop(weights, sample, target, lr=lr, momentum=momentum, prev_updates=prev_updates)
        epochError.append(error)
    errors.append(np.mean(epochError))
    if target_error is not None and errors[-1] <= target_error:
        print(f"Target error reached at epoch {epoch}")
        break

In [None]:
with open(os.path.join(output_dir,"globalErrors.log"), "wb") as f:
    f.write("\n".join([str(e) for e in errors]).encode("utf-8"))
plt.figure()
plt.plot(errors)
plt.title("Error over time")
plt.xlabel("Epoch")
plt.ylabel("Error")
plt.show()
plt.savefig(os.path.join(output_dir,"globalErrors.png"))

In [None]:
np.save(os.path.join(output_dir,"weights.npy"), weights)

In [None]:
def write_and_print(string, file):
    print(string)
    file.write(str(string) + "\n")
fileOutputs = []
with open(os.path.join(output_dir,"summary.log"), "w") as summary:
    if(dataset == "iris"):
        outputs = []
        targets = []
        for sample, target in testing_data:
            fileOutput = {}
            fileOutput["in"] = sample.tolist()
            activations = []
            inferenceOut = inference(weights, sample, None, activations)
            fileOutput["error"] = np.mean((inferenceOut - target) ** 2)
            fileOutput["target"] = target.tolist()
            fileOutput["deltas"] = [d.tolist() for d in calc_deltas(inferenceOut, target, weights, activations)]
            fileOutput["weights"] = np.flip(weights,0).tolist()
            fileOutput["activations"] = [d.tolist() for d in list(reversed(activations))]
            fileOutputs.append(fileOutput)
            
            outputs.append(np.argmax(inference(weights, sample)))
            targets.append(np.argmax(target))
        
        write_and_print("Macierz pomyłek:", summary)
        cmatrix = confusion_matrix(targets, outputs)
        df = pd.DataFrame(cmatrix, index=iris.target_names, columns=iris.target_names)
        write_and_print(df, summary)
        write_and_print("", summary)
        write_and_print(
            pd.DataFrame(
                cmatrix.diagonal(), 
                index=iris.target_names,
                columns=["Poprawne klasyfikacje"],
            ), summary
        )
        write_and_print("", summary)
        diagonal_sum = cmatrix.diagonal().sum()
        matrix_all = cmatrix.sum()
        percent = round(diagonal_sum / matrix_all * 100, 2)
        write_and_print(f"Poprawnie sklasyfikowano: {diagonal_sum}/{matrix_all} {percent}%", summary)
        write_and_print("", summary)
        if iris is not None:
            write_and_print("Classification report:", summary)
            write_and_print(classification_report(targets, outputs, target_names=iris.target_names), summary)
    elif dataset == "autoasocjacja":
        write_and_print(f"Liczba epok: {len(errors)} \n", summary)
        for sample, target in testing_data:
            fileOutput = {}
            fileOutput["in"] = sample.tolist()
            activations = []
            inferenceOut = inference(weights, sample, None, activations)
            fileOutput["error"] = np.mean((inferenceOut - target) ** 2)
            fileOutput["target"] = target.tolist()
            fileOutput["deltas"] = [d.tolist() for d in calc_deltas(inferenceOut, target, weights, activations)]
            fileOutput["weights"] = np.flip(weights,0).tolist()
            fileOutput["activations"] = [d.tolist() for d in list(reversed(activations))]
            fileOutputs.append(fileOutput)
            write_and_print(f"{sample} => {np.array2string(inferenceOut, precision=8, suppress_small=True)}", summary)
    json.dump(fileOutputs, open(os.path.join(output_dir,"inference.log.json"), "w"), indent=4)