# Neural network
### Impementation

In [50]:
import numpy as np
import numpy.linalg as la
import pandas as pd 
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
import sys
import math
import inspect
from PIL import Image
import os

In [2]:
class ProgressBar:
    __slots__ = ['description', 'overall', 'done', 'prevp', 'len', 'closed']
    def __init__(self, description, overall):
        self.description = description
        self.overall = overall
        self.done = 0
        self.prevp = -1
        self.len = 40
        self.closed = False
        self.__show()
    def go(self):
        self.done += 1
        self.__show()
    def __show(self):
        p100 = int(round(100.0 * self.done / self.overall))
        if p100 > self.prevp:
            p = int(round(self.len * self.done / self.overall))
            print("\r%s: [%s%s] % 3d%%" % (self.description, '#' * p, '.' * (self.len - p), p100), end='')
            self.prevp = p100
            if p == self.len:
                self.close()
    def close(self):
        if not self.closed:
            self.closed = True
            print()

In [56]:
def arr_to_pic(a):
    return Image.fromarray(np.uint8(a))

def pic_to_arr(img):
    if len(img.shape) == 3:
        return np.array(img[:, :, 0])
    else:
        return img

def png_to_arr(img):
    img = np.array(img)
    img = pic_to_arr(img)
    img = np.divide(img, 255)
    return img

In [4]:
def bytes_from_file(filename, chunksize=8192):
    with open(filename, 'rb') as f:
        while True:
            chunk = f.read(chunksize)
            if chunk:
                for b in chunk:
                    yield b
            else:
                break

def read_train_labels(filename, need_buben):
    all_bytes = list(bytes_from_file(filename))
    def read(idx=[0]):
        idx[0] += 1
        return all_bytes[idx[0] - 1]
    def read_i32():
        return (int(read()) << 24) + (int(read()) << 16) + (int(read()) << 8) + int(read())
    buben = read_i32()
    assert "buben must be %d" % need_buben, buben == need_buben
    count = read_i32()
    labels = [read() for _ in range(count)]
    return np.array(labels)

def read_train_images(filename, need_buben):
    all_bytes = list(bytes_from_file(filename))
    def read(idx=[0]):
        idx[0] += 1
        return all_bytes[idx[0] - 1]
    def read_i32():
        return (int(read()) << 24) + (int(read()) << 16) + (int(read()) << 8) + int(read())
    buben = read_i32()
    assert "buben must be %d" % need_buben, buben == need_buben
    count = read_i32()
    rows = read_i32()
    columns = read_i32()
    print(count, rows, columns, count * rows * columns, file=sys.stderr)
    imgs = np.zeros((count, rows, columns))
    for x in range(count):
        for i in range(rows):
            for j in range(columns):
                imgs[x, i, j] = read() / 255
    return imgs

In [5]:
def draw_pics(images):
    for i, img in enumerate(images):
        arr_to_pic(img).save('pics/test_pic%02d.png' % i)

In [6]:
class Node:
    def __init__(self, func, ws):
        self.inodes = []
        self.onodes = []
        self.value = None
        self.func = func
        self.ws = ws
    def get(self):
        if self.value == None:
            assert len(self.inodes) == len(self.ws)
            gets = [n.get() for n in self.inodes]
            arg = sum(w * x for (w, x) in zip(self.ws, gets))
            self.value = self.func(arg)
        return self.value
    def clear(self):
        self.value = None
    def add_onode(self, onode):
        self.onodes.append(onode)
    def add_inode(self, inode):
        self.inodes.append(inode)
    def isbayes(self):
        return False
    def dump(self, file):
        print('node %s' % ' '.join(map(str, self.ws)), file=file)

class InputNode:
    def __init__(self, i, j):
        self.i = i
        self.j = j
        self.value = None
    def get(self, img=None):
        if self.value == None:
            self.value = img[self.i, self.j]
        return self.value
    def clear(self):
        self.value = None
    def add_onode(self, onode):
        pass
    def add_inode(self, inode):
        pass
    def isbayes(self):
        return False
    def dump(self, file):
        print('input %d %d' % (self.i, self.j), file=file)

class BayesUnit:
    def get(*args):
        return 1
    def clear():
        pass
    def add_onode(onode):
        pass
    def add_inode(inode):
        pass
    def isbayes():
        return True
    def dump(file):
        print('bayes', file=file)

def nodepipe(inode, onode):
    inode.add_onode(onode)
    onode.add_inode(inode)
    
class Layer:
    def __init__(self, nodes):
        self.nodes = nodes
    def add_node(self, node):
        self.nodes.append(node)
    def calc(self, *args, **kwargs):
        f = lambda node: node.get(*args)
        pool = kwargs['pool'] if 'pool' in kwargs.keys() else None
        if pool != None:
            return list(pool.map(f, self.nodes))
        else:
            return [f(node) for node in self.nodes]
    def clear(self):
        for node in self.nodes:
            node.clear()
    def size(self):
        return len(self.nodes)

def layerpipe(ilayer, olayer):
    for inode in ilayer.nodes:
        for onode in olayer.nodes:
            nodepipe(inode, onode)

In [7]:
def identity(x):
    return x

def sigmoid(x):
    return 1 / (1 + np.exp(-x))

def gaussian(mean, variance):
    std = np.sqrt(variance)
    return lambda: np.random.normal(mean, std)

In [8]:
def build_net(rows, columns, mid_size):
    size = rows * columns
    layers = []
    
    input_layer = Layer([])
    input_layer.add_node(BayesUnit)
    for i in range(rows):
        for j in range(columns):
            node = InputNode(i, j)
            input_layer.add_node(node)
    layers.append(input_layer)
            
    mid_layer = Layer([])
    mid_layer.add_node(BayesUnit)
    for i in range(mid_size):
        csize = 1 + layers[-1].size()
        node = Node(sigmoid, [0] * csize)
        mid_layer.add_node(node)
    layers.append(mid_layer)
    
    output_layer = Layer([])
    for i in range(10):
        csize = 1 + layers[-1].size()
        node = Node(sigmoid, [0] * csize)
        output_layer.add_node(node)
    layers.append(output_layer)
    
    for i in range(len(layers) - 1):
        layerpipe(layers[i], layers[i + 1])
        
    np.random.seed(1321)
    for layer in layers:
        for node in layer.nodes:
            if isinstance(node, Node):
                variance = 2 / (len(node.inodes) + len(node.onodes))
                gen = gaussian(0, variance)
                node.ws = [gen() for _ in node.inodes]
    return layers

In [9]:
import multiprocessing.dummy
thread_pool = multiprocessing.dummy.Pool(5)

In [10]:
def apply_net(layers, image):
    for layer in layers:
        layer.clear()
    layers[0].calc(image, pool=thread_pool)
    return np.array(layers[-1].calc(pool=thread_pool))

In [11]:
def train_local(layers, img, ans, lrate=0.1):
    cur_res = apply_net(layers, img)
    sig1 = np.zeros(cur_res.shape)
    for i in range(cur_res.shape[0]):
        sig1[i] = cur_res[i] * (1 - cur_res[i])

    epsM = cur_res - ans
    epsH = np.zeros(len(layers[-2].nodes))
    for h, pnode in enumerate(layers[-2].nodes):
        for m, node in enumerate(layers[-1].nodes):
            if node.isbayes():
                continue
            epsH[h] += epsM[m] * node.ws[h] * sig1[m]

    for m, node in enumerate(layers[-1].nodes):
        if node.isbayes():
            continue
        for h, pnode, in enumerate(layers[-2].nodes):
            node.ws[h] -= lrate * epsM[m] * sig1[m] * pnode.get()
    
    for h, pnode in enumerate(layers[-2].nodes):
        if pnode.isbayes():
            continue
        sigma = pnode.get() * (1 - pnode.get())
        for j, jnode in enumerate(layers[-3].nodes):
            pnode.ws[j] -= lrate * epsH[h] * jnode.get() * sigma
    return layers
            
def train_net(layers, images, answers, bsize=128, iters=1, epochs=1, lrate=0.1):
    cnt = len(images)
    bs, anss = [], []
    for i in range(0, cnt, bsize):
        j = min(i + bsize, cnt)
        bs.append(images[i : j, :])
        anss.append(answers[i : j, :])
    pbar = ProgressBar('Train net', epochs * len(images) * iters)
    for ep in range(epochs):
        for b, a in zip(bs, anss):
            for it in range(iters):
                for img, ans in zip(b, a):
                    pbar.go()
                    train_local(layers, img, ans, lrate)
    pbar.close()
    return layers

In [12]:
def dump_net(net, file):
    print('netsize', len(net), file=file)
    for layer in net:
        print('layersize', len(layer.nodes), file=file)
        for node in layer.nodes:
            node.dump(file)

def load_net(file, func):
    cw, cl = file.readline().strip().split()
    if cw != 'netsize':
        raise RuntimeError('\'netsize\' excepted, %r found' % cw)
    cl = int(cl)
    net = []
    for _ in range(cl):
        cw, cnt = file.readline().strip().split()
        if cw != 'layersize':
            raise RuntimeError('\'layersize\' excepted, %r found' % cw)
        cnt = int(cnt)
        layer = Layer([])
        for __ in range(cnt):
            tokens = file.readline().strip().split()
            if tokens[0] == 'node':
                ws = list(map(float, tokens[1:]))
                layer.add_node(Node(func, ws))
            elif tokens[0] == 'input':
                i, j = map(int, tokens[1:])
                layer.add_node(InputNode(i, j))
            elif tokens[0] == 'bayes':
                layer.add_node(BayesUnit)
            else:
                raise RuntimeError('unknown first token: %s' % tokens[0])
        net.append(layer)
    for i in range(len(net) - 1):
        layerpipe(net[i], net[i + 1])
    return net

def test_dump(net):
    fn = 'test.txt'
    with open(fn, 'w') as f:
        dump_net(net, f)
    with open(fn) as f:
        new_net = load_net(f, sigmoid)
    for l1, l2 in zip(net, new_net):
        assert(len(l1.nodes) == len(l2.nodes))
        for n1, n2 in zip(l1.nodes, l2.nodes):
            if isinstance(n1, Node):
                assert isinstance(n2, Node)
                assert len(n1.inodes) == len(n2.inodes)
                assert len(n1.onodes) == len(n2.onodes)
                diff = np.array(n1.ws) - np.array(n2.ws)
                assert np.absolute(diff).max() < 1e-7
            elif isinstance(n1, InputNode):
                assert isinstance(n2, InputNode)
                assert n1.i == n2.i and n1.j == n2.j
            else:
                assert n1 == BayesUnit
                assert n2 == BayesUnit
    print("Ok test", file=sys.stderr)

### Interface

In [13]:
def make_net(images, labels, mid_size=50, iters=1, lrate=0.1):
    answers = np.zeros((labels.shape[0], 10))
    for i in range(labels.shape[0]):
        answers[i, labels[i]] = 1
    rows, columns = images[0].shape
    layers = build_net(rows, columns, mid_size)
    layers = train_net(layers, images, answers, iters=iters, lrate=lrate)
    return layers

In [15]:
def test_net(net, images, labels):
    goods, overall = 0, labels.shape[0]
    pbar = ProgressBar('Test net', overall)
    for i in range(overall):
        result = np.argmax(apply_net(net, images[i]))
        if result == labels[i]:
            goods += 1
        pbar.go()
    pbar.close()
    print("ok %.3f%% (%d out of %d)" % (goods / overall * 100, goods, overall))

In [32]:
labels = read_train_labels('train-labels.idx1-ubyte', 2049)
images = read_train_images('train-images.idx3-ubyte', 2051)
test_labels = read_train_labels('t10k-labels.idx1-ubyte', 2049)
test_images = read_train_images('t10k-images.idx3-ubyte', 2051)

60000 28 28 47040000
10000 28 28 7840000


In [33]:
test_dump(make_net(images[:10], labels[:10]))

Train net: [########################################]  100%


Ok test


In [None]:
cnt = 1000
net = make_net(images[:cnt], labels[:cnt], iters=2, lrate=0.3)
test_net(net, test_images, test_labels)

In [61]:
def apply_on_pic(net, img):
    img = png_to_arr(img)
    result = np.argmax(apply_net(net, img))
    print("Result is %d" % result)
    
with open('nets/net-all-0.4.net') as file:
    net = load_net(file, sigmoid)
for path in os.listdir('pics/'):
    print(path.split('.')[0] + ": ", end='')
    apply_on_pic(net, Image.open('pics/' + path))

seven: Result is 7
three-1: Result is 3
three: Result is 3
