In [1]:
%reload_ext autoreload
%autoreload 2

In [4]:
from ml.dataset.mnist import load as load_mnist, init as init_mnist
import seaborn as sbs
import numpy as np

MNIST_PATH = "../data/mnist"
x_train, y_train, x_test, y_test = load_mnist(MNIST_PATH)
x_train = np.array(((x_train / 256) - 0.5) / 0.5)  # Normalize
y_train = np.array([[int(x == y) for x in range(10)] for y in y_train])
x_test = np.array(((x_test / 256) - 0.5) / 0.5)  # Normalize
y_test = np.array([[int(x == y) for x in range(10)] for y in y_test])

In [5]:
from ml.nn.base import NeuralNetwork
from ml.nn.optim import SGD, Adagrad
from ml.nn.layer.linear import Linear
from ml.nn.loss import MSELoss, BinaryCrossEntropyLoss
from ml.nn.activation import Softmax, ReLU, LeakyReLU, Sigmoid
import random
import json

nn = NeuralNetwork([
    Linear(784, 512),
    LeakyReLU(),
    Linear(512, 10),
    Softmax()
], BinaryCrossEntropyLoss(), SGD(1e-4))

In [8]:
from typing import Tuple


def train_nn(batch_size: int, epoch_size: int, save_path: str = "../model.json") -> Tuple[np.ndarray, np.ndarray]:
    data_size = len(x_train)
    data_pos = np.arange(len(x_train))

    loss_data = np.zeros(len(range(0, data_size, batch_size)) * epoch_size)
    loss_data_on = 0

    loss_epoch = np.zeros(epoch_size)

    for epoch in range(1, epoch_size + 1):
        np.random.shuffle(data_pos)
        loss_total = 0.
        for i in range(0, data_size, batch_size):
            p = data_pos[i:min(i + batch_size, data_size)]
            nn.zero_grad()
            result = nn(x_train[p])
            loss = nn.loss(y_train[p])
            nn.backward()
            nn.step()
            loss_total += loss
            print(f"\r{epoch}; {i}/{data_size}; {round(loss.item(), 5)} {' ' * 30}", end="")

            loss_data[loss_data_on] = loss
            loss_data_on += 1

        loss_total /= len(range(0, data_size, batch_size))
        print(f"\r{epoch}; Loss total: {round(loss_total, 5)}. {' ' * 30}")

        loss_epoch[epoch - 1] = loss_total

    if save_path is not None:
        with open(save_path, "w+") as f:
            f.write(json.dumps(nn.state_dict()))

    return loss_data, loss_epoch


loss_on_iter, loss_on_epoch = train_nn(1024, 20)

1; 10240/60000; 0.05559                               

KeyboardInterrupt: 

In [None]:
sbs.lineplot(y=loss_on_iter, x=np.arange(loss_on_iter.shape[0]) + 1)

In [None]:
sbs.lineplot(y=loss_on_epoch, x=np.arange(len(loss_on_epoch)) + 1)

In [None]:
from typing import Tuple


def test_nn(batch_size: int = 256, model_path: str = "../model.json",
            data_set: Tuple[np.ndarray, np.ndarray] = (x_test, y_test)) -> Tuple[int, int, int, float]:
    with open(model_path, "r+") as f:
        nn.load_state_dict(json.load(f))
    data, lbl = data_set

    total = len(data)
    data_pos = np.arange(total)

    correct = 0
    loss = 0.
    epoch_cnt = len(range(0, total, batch_size))

    cnt = 0
    for i in range(0, total, batch_size):
        p = data_pos[i: min(i + batch_size, total)]
        x = data[p]
        y = lbl[p]

        nn.zero_grad()
        result = nn(x)

        loss_round = nn.loss(y)

        loss += loss_round
        correct_round = np.sum(np.argmax(result, axis=1) == np.argmax(y, axis=1))
        correct += correct_round

        cnt += 1
        print(f"\r{cnt}/{epoch_cnt}; Loss: {round(loss_round, 5)}; "
              f"Accuracy: {round(correct_round / x.shape[0], 5)} {' ' * 30}", end="")

    print()
    print(f"Test Result:")
    print(f"Correct: {correct}")
    print(f"Wrong: {total - correct}")
    print(f"Loss: {round(loss / epoch_cnt, 5)}")
    print(f"Accuracy: {round(correct / total, 5)}")

    return total, correct, total - correct, loss


test_nn()

# Compare to traditional algorithms

In [None]:
import sklearn
from sklearn.linear_model import LinearRegression
from sklearn.ensemble import RandomForestClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import classification_report

In [None]:
def report(model):
    out, targ = np.argmax(y_test, axis=1), np.argmax(lin_reg.predict(x_test), axis=1)
    report = classification_report(out, targ)
    print(report)
    correct = np.sum(out == targ)
    return out.shape[0], correct, out.shape[0] - correct

## Linear Regression

In [None]:
lin_reg = LinearRegression()
lin_reg.fit(x_train, y_train)

In [None]:
total, lin_reg_correct, lin_reg_wrong = report(lin_reg)

## Random Forest

In [None]:
forest = RandomForestClassifier()
forest.fit(x_train, y_train)

In [None]:
total, forest_correct, forest_wrong = report(forest)

## Decision Tree

In [None]:
d_tree = DecisionTreeClassifier()
d_tree.fit(x_train, np.argmax(y_train, axis=1))

In [None]:
d_tree_result = d_tree.predict(x_test)
d_tree_correct = np.sum(d_tree_result == np.argmax(y_test, axis=1))
d_tree_wrong = total - d_tree_correct
print(classification_report(np.argmax(y_test, axis=1), d_tree_result))