In [None]:
from __future__ import annotations
import graphviz
from IPython.display import display
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt
from typing import Literal
import time
import plotly.express as px

In [None]:
class Value:
    def __init__(
        self, value: float, ancestors: tuple[Value, ...] = (), name="", operand=""
    ):
        self.value = value
        self.ancestors = ancestors
        self.name = name
        self.grad = 0.0
        self._backward = lambda: None
        self.operand = operand

    # make values printable
    def __repr__(self) -> str:
        return f"{self.name}, value={self.value}, grad={self.grad}"

    # Addition
    def __add__(self, other: Value) -> Value:
        if not isinstance(other, Value):
            other = Value(other)
        result = Value(self.value + other.value, (self, other), name="add", operand="+")

        def _backward():
            self.grad += result.grad
            other.grad += result.grad

        result._backward = _backward
        return result

    def __iadd__(self, other: Value) -> Value:
        if not isinstance(other, Value):
            other = Value(other)
        result = Value(
            self.value + other.value, (self, other), name="iadd", operand="+="
        )

        def _backward():
            self.grad += result.grad
            other.grad += result.grad

        result._backward = _backward
        return result

    def __radd__(self, other: Value):
        return self + other

    # Subtraktion
    def __sub__(self, other: Value) -> Value:
        if not isinstance(other, Value):
            other = Value(other)
        result = Value(self.value - other.value, (self, other), name="sub", operand="-")

        def _backward():
            self.grad += 1.0 * result.grad
            other.grad += -1.0 * result.grad

        result._backward = _backward
        return result

    def __rsub__(self, other: Value) -> Value:
        return self - other

    # Multiplikation
    def __mul__(self, other: Value) -> Value:
        if not isinstance(other, Value):
            other = Value(other)
        result = Value(self.value * other.value, (self, other), name="mul", operand="*")

        def _backward():
            self.grad += other.value * result.grad
            other.grad += self.value * result.grad

        result._backward = _backward
        return result

    def __rmul__(self, other: Value) -> Value:
        return self * other

    # Floatingpointdivision
    def __truediv__(self, other: Value) -> Value:
        if not isinstance(other, Value):
            other = Value(other)
        result = Value(self.value / other.value, (self, other), name="div", operand="/")

        def _backward():
            self.grad += 1 / other.value * result.grad
            other.grad += -self.value / other.value**2 * result.grad

        result._backward = _backward
        return result

    def __rtruediv__(self, other: Value) -> Value:
        return self / other

    # Potenzierung (x**n)
    def __pow__(self, other: Value) -> Value:
        if not isinstance(other, Value):
            other = Value(other)
        result = Value(self.value**other.value, (self, other), name="pow", operand="^")

        def _backward():
            self.grad += other.value * self.value ** (other.value - 1.0) * result.grad
            # assert self.value >= 0, "cannot compute log with negative base
            other.grad += self.value**other.value * np.log(self.value) * result.grad
            # print(self.grad, other.grad)

        result._backward = _backward
        return result

    # Exponentierung (e**x)
    def exp(self) -> Value:
        result = Value(np.exp(self.value), (self,), name="exp", operand="e^")

        def _backward():
            self.grad += result.value * result.grad

        result._backward = _backward
        return result

    def log(self) -> Value:
        result = Value(np.log(self.value), (self,), name="log")

        def _backward():
            self.grad += 1 / self.value * result.grad

        result._backward = _backward
        return result

    # backwards up until this point
    # Negation
    def __neg__(self) -> Value:
        result = Value(-self.value, (self,), name="neg", operand="-")

        def _backward():
            self.grad += -result.grad

        result._backward = _backward
        return result

    def sigmoid(self) -> Value:
        sigmoid_value = 1 / (1 + np.exp(-self.value))
        result = Value(sigmoid_value, (self,), name="sigmoid")

        def _backward():
            self.grad += sigmoid_value * (1 - sigmoid_value) * result.grad

        result._backward = _backward
        return result

    # how to fix backward with ne values
    def relu(self) -> Value:
        result_value = self.value if self.value > 0 else 0.0
        result = Value(result_value, (self,), name="ReLU")

        def _backward():
            self.grad += (self.value > 0) * result.grad

        result._backward = _backward
        return result

    def leaky_relu(self, alpha=0.01) -> Value:
        result_value = self.value if self.value > 0 else alpha * self.value
        result = Value(result_value, (self,), name="LeakyReLU")

        def _backward():
            self.grad += (1 if self.value > 0 else alpha) * result.grad

        result._backward = _backward
        return result

    # Vergleichsoperatoren <, >, >=, <=
    def __lt__(self, other: Value) -> bool:
        if not isinstance(other, Value):
            other = Value(other)
        return self.value < other.value

    def __gt__(self, other: Value) -> bool:
        if not isinstance(other, Value):
            other = Value(other)
        return self.value > other.value

    def __le__(self, other: Value) -> bool:
        if not isinstance(other, Value):
            other = Value(other)
        return self.value <= other.value

    def __ge__(self, other: Value) -> bool:
        if not isinstance(other, Value):
            other = Value(other)
        return self.value >= other.value

    def backward(self) -> None:
        # iterate through the graph, calculate gradients and update nodes
        topo_sorted_nodes = []
        visited = set()

        # topological sort of the nodes
        def build_topo(node: Value):
            if node not in visited:
                visited.add(node)
                for ancestor in node.ancestors:
                    build_topo(ancestor)
                topo_sorted_nodes.append(node)

        build_topo(self)

        self.grad = 1.0
        for node in reversed(topo_sorted_nodes):
            node._backward()

    def plot_graph(self):
        # "graph visualization python", graphviz
        dot = graphviz.Digraph(format="svg", graph_attr={"rankdir": "LR"})

        def add_nodes(dot: graphviz.Digraph, node: Value):
            label = f"{node.name}|value={node.value}|grad={node.grad}"
            unique_node_name = str(id(node))

            # add value nodes to graph
            dot.node(
                name=unique_node_name,
                label=label,
                shape="record",
                color=(
                    "lightgreen" if node.ancestors == () and node.name != "" else None
                ),  # check if input
                style="filled",
            )

            if node.operand:  # check if there is an operand to display
                op_name = unique_node_name + node.operand
                # add operation node
                dot.node(
                    name=op_name,
                    label=node.operand,
                )
                # draw edge from operand to result
                dot.edge(op_name, unique_node_name)

            # iterate through the ancestors to build the whole graph
            for ancestor in node.ancestors:
                ancestor_name = add_nodes(dot, ancestor)
                if node.operand:
                    # ensure ancestor edge goes to operand node if it exists
                    dot.edge(ancestor_name, op_name)
                else:
                    dot.edge(ancestor_name, unique_node_name)

            return unique_node_name

        add_nodes(dot, self)
        display(dot)

In [None]:
def parse_mnist_data(
    idx_file_training_samples: str,
    idx_file_training_labels: str,
    number_1: int,
    number_2: int,
) -> tuple[np.ndarray, np.ndarray]:

    training_labels = parse_mnist_labels(idx_file_training_labels)
    training_samples = parse_mnist_images(idx_file_training_samples)

    # filter only two numbers with a mask
    mask = (training_labels.flatten() == number_1) | (
        training_labels.flatten() == number_2
    )
    filtered_labels = training_labels[mask]
    filtered_samples = training_samples[mask]

    # TODO: Image.resize() Algorithmus -> Recherche
    # -LANCZOS als Algorithmus zur Bildverkleinerung -> sinc(x) = sin(πx) / (πx)
    # -gemacht zum downscalen von Bildern ->
    # downscale images with pillow
    downscaled_samples = np.array(
        # Image.Resampling.LANCZOS
        [
            Image.fromarray(img).resize((10, 10), Image.Resampling.LANCZOS)
            for img in filtered_samples
        ]
    )

    downscaled_samples = downscaled_samples / 255

    return downscaled_samples, filtered_labels


def parse_mnist_images(idx_file_path: str) -> np.ndarray:
    with open(idx_file_path, "rb") as f:

        # read magic number
        f.read(4)
        num_img = int.from_bytes(f.read(4), "big")
        num_rows = int.from_bytes(f.read(4), "big")
        num_cols = int.from_bytes(f.read(4), "big")

        data = f.read()
        out = np.ndarray((num_img, num_rows, num_cols), np.uint8, data)
        return out


def parse_mnist_labels(idx_file_path: str) -> np.ndarray:
    with open(idx_file_path, "rb") as f:

        # read magic number
        f.read(4)
        num_item = int.from_bytes(f.read(4), "big")

        data = f.read()
        out = np.ndarray((num_item, 1), np.uint8, data)
        return out


def plot_image(img: np.ndarray) -> plt.Figure:
    assert len(img.shape) == 2, "input must be 2-dimensional (single image)"

    fig, ax = plt.subplots()
    ax.axis("off")
    ax.imshow(img * 255, cmap="gray")

    plt.close()
    return fig

In [None]:
np.random.seed(0xDEADBEEF)


class Neuron:
    def __init__(self, num_inputs: int) -> None:
        self.weights = [Value(np.random.random(size=None)) for _ in range(num_inputs)]
        self.bias = Value(0.0, name="bias")

    def __call__(self, x: np.ndarray) -> Value:
        # implement f(x) = activation (bias + sum(weights * values))
        if isinstance(x, np.ndarray):
            x = x.flatten()
        res = sum(w_i * x_i for w_i, x_i in zip(self.weights, x)) + self.bias
        return res

    def parameters(self) -> list[Value]:
        return self.weights + [self.bias]

    def param_count(self) -> int:
        return len(self.weights + [self.bias])


class Layer:
    def __init__(
        self,
        num_inputs: int,
        num_outputs: int,
        use_activation: Literal["relu", "sigmoid"],
    ) -> None:
        self.neurons = [Neuron(num_inputs) for _ in range(num_outputs)]
        self.use_activation = use_activation

    def __call__(self, x: np.ndarray) -> list[Value]:
        outputs = [n(x) for n in self.neurons]
        if self.use_activation == "relu":
            return [o.relu() for o in outputs]
        return [o.sigmoid() for o in outputs]

    def parameters(self) -> list:
        params = [p for n in self.neurons for p in n.parameters()]
        return params


class MLP:
    def __init__(self, num_inputs: int, num_hidden: list[int], num_out: int) -> None:
        size = [num_inputs] + num_hidden
        self.layers = [
            Layer(size[i], size[i + 1], "relu") for i in range(len(num_hidden))
        ] + [Layer(num_hidden[-1], num_out, "sigmoid")]

    def __call__(self, x: np.ndarray) -> Value:
        for layer in self.layers:
            x = layer(x)
        return x[0]

    def parameters(self) -> list:
        params = [p for l in self.layers for p in l.parameters()]
        return params

In [None]:
a = Value(2.9)
b = Value(3.5)
c = Value(-1.0)

res = a * b / c

foo = res.sigmoid()
foo.backward()

pass

In [None]:
a = Value(2.9)
b = Value(3.5)
c = Value(-1.0)

res = a * b * c

foo = res.relu()
foo.backward()

pass

In [None]:
def res_loss(y_pred: Value, y_gt) -> Value:
    y_gt = Value(y_gt.item(), (), name="ground truth")
    loss = (y_gt - y_pred) ** 2
    return loss


def cross_entropy_loss(y_pred: Value, y_gt) -> Value:
    epsilon = 1e-15
    y_pred_value_clipped = np.clip(y_pred.value, epsilon, 1 - epsilon)

    y_gt = Value(y_gt.item(), ())
    log_loss_positive = y_gt.value * np.log(y_pred_value_clipped)
    log_loss_negative = (1 - y_gt.value) * np.log(1 - y_pred_value_clipped)

    loss = Value(
        -(log_loss_positive + log_loss_negative),
        (y_pred, y_gt),
        name="cross_entropy_loss",
    )
    return loss

In [None]:
# load images
train_img_path = "../data/train_img.idx"
train_label_path = "../data/train_label.idx"
test_img_path = "../data/test_img.idx"
test_label_path = "../data/test_label.idx"

train_img, train_label = parse_mnist_data(train_img_path, train_label_path, 0, 1)
# only get the first 1000 images and labels
train_img = train_img[:1000]
train_label = train_label[:1000]

test_img, test_label = parse_mnist_data(test_img_path, test_label_path, 0, 1)

# initialize MLP
nin = 100
n_hidden = [10]
nout = 1
mlp = MLP(nin, n_hidden, nout)

In [None]:
np.random.seed(0xDEADBEEF)
# Hyperparameter
lr = 1e-3
epochs = 1
batch_size = 30
num_img = train_img.shape[0]
num_batches = int(num_img / batch_size) + 1

losses_train = []
correct_test_pred = 0
correct_train_pred = 0
accuracies_test = []
accuracies_train = []
times = []

test = mlp.parameters()
for e in range(epochs):
    # Genauigkeit berechnen 1 pro Epoche + Plot für Training und Test
    print("calculating accuracies...")
    for img_test, lab_test, img_train, lab_train in zip(
        test_img, test_label, train_img, train_label
    ):
        pred_train = mlp(img_test)
        pred_test = mlp(img_train)
        if np.fabs(pred_test.value - lab_test.item()) < 0.5:
            correct_test_pred += 1
        if np.fabs(pred_train.value - lab_train.item()) < 0.5:
            correct_train_pred += 1

    accuracies_test.append(correct_test_pred / len(test_img))
    accuracies_train.append(correct_train_pred / len(train_img))
    print("done")

    # Epochendauer ausgeben
    start_time = time.process_time()

    idx = np.random.permutation(np.arange(num_img))
    # inplace for better cache usage
    train_img[:] = train_img[idx]
    train_label[:] = train_label[idx]

    for b in range(num_batches):
        start_sample = b * batch_size
        end_sample = min((b + 1) * batch_size, num_img)
        x = train_img[start_sample:end_sample].reshape(-1, 10, 10)
        y_gt = train_label[start_sample:end_sample]

        # zero grad
        for p in mlp.parameters():
            p.grad = 0.0

        # forward pass
        y_pred = [mlp(img) for img in x]

        # backward pass
        outputs = [cross_entropy_loss(ypred, ygt) for ypred, ygt in zip(y_pred, y_gt)]
        loss = sum(outputs) / len(outputs)
        loss.backward()
        (
            print(f"Epoche: {e+1}, Batch: {b+1} / {num_batches} Loss: {loss.value}")
            if b % 5 == 0 or b + 1 == num_batches
            else None
        )
        losses_train.append(loss.value)

        # optimization
        for p in mlp.parameters():
            # print(p.grad)
            p.value -= lr * p.grad

    correct_train_pred = 0
    correct_test_pred = 0
    end_time = time.process_time()
    times.append(end_time - start_time)
    print(f"Epoche {e+1}: {times[e]} s")

assert mlp.parameters() != test, "Parameter werden nicht geändert!"

In [None]:
# Loss von Testdaten
epochs = 10
batch_size = 30
num_img = test_img.shape[0]
num_batches = int(num_img / batch_size) + 1

losses_test = []

for e in range(epochs):
    print(f"starting epoch {e+1}...")
    for b in range(num_batches):
        start_sample = b * batch_size
        end_sample = min((b + 1) * batch_size, num_img)
        x_test = test_img[start_sample:end_sample]
        y_gt_test = test_label[start_sample:end_sample]

        # forward pass
        y_pred_test = [mlp(img) for img in x_test]

        # calculate loss
        outputs_test = [
            cross_entropy_loss(ypred, ygt) for ypred, ygt in zip(y_pred_test, y_gt_test)
        ]

        loss_test = sum(outputs_test) / len(outputs_test)
        losses_test.append(loss_test.value)

    print(f"epoch {e+1} done")

In [None]:
px.line(x=range(len(times)), y=times)

In [None]:
px.line(x=range(len(losses_train)), y=losses_train)

In [None]:
# loss von Test-Datensatz
px.line(x=range(len(losses_test)), y=losses_test)

In [None]:
px.line(x=range(len(accuracies_test)), y=accuracies_test)

In [None]:
px.line(x=range(len(accuracies_train)), y=accuracies_train)