<a href="https://colab.research.google.com/github/fatisepah/samples/blob/CNN-from-scratch/implementation_lenet_from_scratch_2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [20]:
import os
import functools
import operator
import gzip
import struct
import array
import numpy as np
from urllib import urlretrieve
import sys

#from: https://github.com/datapythonista/mnist
DATASET_DIRECTORY = 'data/'
URL = 'http://yann.lecun.com/exdb/mnist/'

def parse_idx(fd):
    DATA_TYPES = {0x08: 'B',  # unsigned byte
                  0x09: 'b',  # signed byte
                  0x0b: 'h',  # short (2 bytes)
                  0x0c: 'i',  # int (4 bytes)
                  0x0d: 'f',  # float (4 bytes)
                  0x0e: 'd'}  # double (8 bytes)

    header = fd.read(4)
    if len(header) != 4:
        raise IdxDecodeError('Invalid IDX file, file empty or does not contain a full header.')

    zeros, data_type, num_dimensions = struct.unpack('>HBB', header)

    if zeros != 0:
        raise IdxDecodeError('Invalid IDX file, file must start with two zero bytes. '
                             'Found 0x%02x' % zeros)

    try:
        data_type = DATA_TYPES[data_type]
    except KeyError:
        raise IdxDecodeError('Unknown data type 0x%02x in IDX file' % data_type)

    dimension_sizes = struct.unpack('>' + 'I' * num_dimensions,
                                    fd.read(4 * num_dimensions))

    data = array.array(data_type, fd.read())
    data.byteswap()  # looks like array.array reads data as little endian

    expected_items = functools.reduce(operator.mul, dimension_sizes)
    if len(data) != expected_items:
        raise IdxDecodeError('IDX file has wrong number of items. '
                             'Expected: %d. Found: %d' % (expected_items, len(data)))

    return np.array(data).reshape(dimension_sizes)

def print_download_progress(count, block_size, total_size):
    pct_complete = int(count * block_size * 100 / total_size)
    pct_complete = min(pct_complete, 100)
    msg = "\r- Download progress: %d" % (pct_complete) + "%"
    sys.stdout.write(msg)
    sys.stdout.flush()


def download_and_parse_mnist_file(fname, target_dir=None, force=False):
    if not os.path.exists(DATASET_DIRECTORY):
        os.makedirs(DATASET_DIRECTORY)
    if not os.path.exists(DATASET_DIRECTORY+fname):
        print('Downloading '+fname)
        file_path = os.path.join(DATASET_DIRECTORY, fname)
        url = URL + fname
        file_path, _ = urlretrieve(url=url, filename=file_path, reporthook=print_download_progress)
        print("\nDownload finished.")

    fname = 'data/' + fname
    fopen = gzip.open if os.path.splitext(fname)[1] == '.gz' else open
    with fopen(fname, 'rb') as fd:
        return parse_idx(fd)


def train_images():
    return download_and_parse_mnist_file('train-images-idx3-ubyte.gz')


def test_images():
    return download_and_parse_mnist_file('t10k-images-idx3-ubyte.gz')


def train_labels():
    return download_and_parse_mnist_file('train-labels-idx1-ubyte.gz')


def test_labels():
    return download_and_parse_mnist_file('t10k-labels-idx1-ubyte.gz')

ImportError: ignored

In [21]:
#https://github.com/zishansami102/CNN-from-Scratch/blob/master/preprocessing.py
import numpy as np
import cv2
from scipy import ndimage
import math


def shift(img,sx,sy):
    rows,cols = img.shape
    M = np.float32([[1,0,sx],[0,1,sy]])
    shifted = cv2.warpAffine(img,M,(cols,rows))
    return shifted

def getBestShift(img):
    cy,cx = ndimage.measurements.center_of_mass(img)

    rows,cols = img.shape
    shiftx = np.round(cols/2.0-cx).astype(int)
    shifty = np.round(rows/2.0-cy).astype(int)

    return shiftx,shifty

def preprocess(img):
    img=255-np.array(img).reshape(28,28).astype(np.uint8)
    (thresh, gray) = cv2.threshold(img, 128, 255, cv2.THRESH_BINARY | cv2.THRESH_OTSU)

    while np.sum(gray[0]) == 0:
        gray = gray[1:]

    while np.sum(gray[:,0]) == 0:
        gray = np.delete(gray,0,1)

    while np.sum(gray[-1]) == 0:
        gray = gray[:-1]

    while np.sum(gray[:,-1]) == 0:
        gray = np.delete(gray,-1,1)

    rows,cols = gray.shape

    if rows > cols:
        factor = 20.0/rows
        rows = 20
        cols = int(round(cols*factor))
        gray = cv2.resize(gray, (cols,rows))
    else:
        factor = 20.0/cols
        cols = 20
        rows = int(round(rows*factor))
        gray = cv2.resize(gray, (cols, rows))

    colsPadding = (int(math.ceil((28-cols)/2.0)),int(math.floor((28-cols)/2.0)))
    rowsPadding = (int(math.ceil((28-rows)/2.0)),int(math.floor((28-rows)/2.0)))
    gray = np.lib.pad(gray,(rowsPadding,colsPadding),'constant')

    shiftx,shifty = getBestShift(gray)
    shifted = shift(gray,shiftx,shifty)
    gray = shifted

    img = gray.reshape(1,28,28).astype(np.float32)

    img-= int(33.3952)
    img/= int(78.6662)
    return img

In [22]:
import numpy as np 

class Convolution2D:

    def __init__(self, inputs_channel, num_filters, kernel_size, padding, stride, learning_rate, name):
        # weight size: (F, C, K, K)
        # bias size: (F) 
        self.F = num_filters
        self.K = kernel_size
        self.C = inputs_channel

        self.weights = np.zeros((self.F, self.C, self.K, self.K))
        self.bias = np.zeros((self.F, 1))
        for i in range(0,self.F):
            self.weights[i,:,:,:] = np.random.normal(loc=0, scale=np.sqrt(1./(self.C*self.K*self.K)), size=(self.C, self.K, self.K))

        self.p = padding
        self.s = stride
        self.lr = learning_rate
        self.name = name

    def zero_padding(self, inputs, size):
        w, h = inputs.shape[0], inputs.shape[1]
        new_w = 2 * size + w
        new_h = 2 * size + h
        out = np.zeros((new_w, new_h))
        out[size:w+size, size:h+size] = inputs
        return out

    def forward(self, inputs):
        # input size: (C, W, H)
        # output size: (N, F ,WW, HH)
        C = inputs.shape[0]
        W = inputs.shape[1]+2*self.p
        H = inputs.shape[2]+2*self.p
        self.inputs = np.zeros((C, W, H))
        for c in range(inputs.shape[0]):
            self.inputs[c,:,:] = self.zero_padding(inputs[c,:,:], self.p)
        WW = (W - self.K)/self.s + 1
        HH = (H - self.K)/self.s + 1
        feature_maps = np.zeros((self.F, WW, HH))
        for f in range(self.F):
            for w in range(0, WW, self.s):
                for h in range(0, HH, self.s):
                    feature_maps[f,w,h]=np.sum(self.inputs[:,w:w+self.K,h:h+self.K]*self.weights[f,:,:,:])+self.bias[f]

        return feature_maps

    def backward(self, dy):

        C, W, H = self.inputs.shape
        dx = np.zeros(self.inputs.shape)
        dw = np.zeros(self.weights.shape)
        db = np.zeros(self.bias.shape)

        F, W, H = dy.shape
        for f in range(F):
            for w in range(0, W, self.s):
                for h in range(0, H, self.s):
                    dw[f,:,:,:]+=dy[f,w,h]*self.inputs[:,w:w+self.K,h:h+self.K]
                    dx[:,w:w+self.K,h:h+self.K]+=dy[f,w,h]*self.weights[f,:,:,:]

        for f in range(F):
            db[f] = np.sum(dy[f, :, :])

        self.weights -= self.lr * dw
        self.bias -= self.lr * db
        return dx

    def extract(self):
        return {self.name+'.weights':self.weights, self.name+'.bias':self.bias}

    def feed(self, weights, bias):
        self.weights = weights
        self.bias = bias

class Maxpooling2D:

    def __init__(self, pool_size, stride, name):
        self.pool = pool_size
        self.s = stride
        self.name = name

    def forward(self, inputs):
        self.inputs = inputs
        C, W, H = inputs.shape
        new_width = (W - self.pool)/self.s + 1
        new_height = (H - self.pool)/self.s + 1
        out = np.zeros((C, new_width, new_height))
        for c in range(C):
            for w in range(W/self.s):
                for h in range(H/self.s):
                    out[c, w, h] = np.max(self.inputs[c, w*self.s:w*self.s+self.pool, h*self.s:h*self.s+self.pool])
        return out

    def backward(self, dy):
        C, W, H = self.inputs.shape
        dx = np.zeros(self.inputs.shape)
        
        for c in range(C):
            for w in range(0, W, self.pool):
                for h in range(0, H, self.pool):
                    st = np.argmax(self.inputs[c,w:w+self.pool,h:h+self.pool])
                    (idx, idy) = np.unravel_index(st, (self.pool, self.pool))
                    dx[c, w+idx, h+idy] = dy[c, w/self.pool, h/self.pool]
        return dx

    def extract(self):
        return 
    
class FullyConnected:

    def __init__(self, num_inputs, num_outputs, learning_rate, name):
        self.weights = 0.01*np.random.rand(num_inputs, num_outputs)
        self.bias = np.zeros((num_outputs, 1))
        self.lr = learning_rate
        self.name = name

    def forward(self, inputs):
        self.inputs = inputs
        return np.dot(self.inputs, self.weights) + self.bias.T

    def backward(self, dy):

        if dy.shape[0] == self.inputs.shape[0]:
            dy = dy.T
        dw = dy.dot(self.inputs)
        db = np.sum(dy, axis=1, keepdims=True)
        dx = np.dot(dy.T, self.weights.T)

        self.weights -= self.lr * dw.T
        self.bias -= self.lr * db

        return dx

    def extract(self):
        return {self.name+'.weights':self.weights, self.name+'.bias':self.bias}

    def feed(self, weights, bias):
        self.weights = weights
        self.bias = bias

class Flatten:
    def __init__(self):
        pass
    def forward(self, inputs):
        self.C, self.W, self.H = inputs.shape
        return inputs.reshape(1, self.C*self.W*self.H)
    def backward(self, dy):
        return dy.reshape(self.C, self.W, self.H)
    def extract(self):
        return

class ReLu:
    def __init__(self):
        pass
    def forward(self, inputs):
        self.inputs = inputs
        ret = inputs.copy()
        ret[ret < 0] = 0
        return ret
    def backward(self, dy):
        dx = dy.copy()
        dx[self.inputs < 0] = 0
        return dx
    def extract(self):
        return

class Softmax:
    def __init__(self):
        pass
    def forward(self, inputs):
        exp = np.exp(inputs, dtype=np.float)
        self.out = exp/np.sum(exp)
        return self.out
    def backward(self, dy):
        return self.out.T - dy.reshape(dy.shape[0],1)
    def extract(self):
        return



In [23]:
import numpy as np

# loss
def cross_entropy(inputs, labels):

    out_num = labels.shape[0]
    p = np.sum(labels.reshape(1,out_num)*inputs)
    loss = -np.log(p)
    return loss

In [24]:
import numpy as np
import pickle 
import sys
from time import *
#from model.loss import *
#from model.layers import *

class Net:
    def __init__(self):
        # Lenet
        # input: 28x28
        # conv1: (5x5x6)@s1p2 -> 28x28x6 {(28-5+2x2)/1+1}
        # maxpool2: (2x2)@s2 -> 14x14x6 {(28-2)/2+1}
        # conv3: (5x5x16)@s1p0 -> 10x10x16 {(14-5)/1+1}
        # maxpool4: (2x2)@s2 -> 5x5x16 {(10-2)/2+1}
        # conv5: (5x5x120)@s1p0 -> 1x1x120 {(5-5)/1+1}
        # fc6: 120 -> 84
        # fc7: 84 -> 10
        # softmax: 10 -> 10
        lr = 0.01
        self.layers = []
        self.layers.append(Convolution2D(inputs_channel=1, num_filters=6, kernel_size=5, padding=2, stride=1, learning_rate=lr, name='conv1'))
        self.layers.append(ReLu())
        self.layers.append(Maxpooling2D(pool_size=2, stride=2, name='maxpool2'))
        self.layers.append(Convolution2D(inputs_channel=6, num_filters=16, kernel_size=5, padding=0, stride=1, learning_rate=lr, name='conv3'))
        self.layers.append(ReLu())
        self.layers.append(Maxpooling2D(pool_size=2, stride=2, name='maxpool4'))
        self.layers.append(Convolution2D(inputs_channel=16, num_filters=120, kernel_size=5, padding=0, stride=1, learning_rate=lr, name='conv5'))
        self.layers.append(ReLu())
        self.layers.append(Flatten())
        self.layers.append(FullyConnected(num_inputs=120, num_outputs=84, learning_rate=lr, name='fc6'))
        self.layers.append(ReLu())
        self.layers.append(FullyConnected(num_inputs=84, num_outputs=10, learning_rate=lr, name='fc7'))
        self.layers.append(Softmax())
        self.lay_num = len(self.layers)

    def train(self, training_data, training_label, batch_size, epoch, weights_file):
        total_acc = 0
        for e in range(epoch):
            for batch_index in range(0, training_data.shape[0], batch_size):
                # batch input
                if batch_index + batch_size < training_data.shape[0]:
                    data = training_data[batch_index:batch_index+batch_size]
                    label = training_label[batch_index:batch_index + batch_size]
                else:
                    data = training_data[batch_index:training_data.shape[0]]
                    label = training_label[batch_index:training_label.shape[0]]
                loss = 0
                acc = 0
                start_time = time()
                for b in range(batch_size):
                    x = data[b]
                    y = label[b]
                    # forward pass
                    for l in range(self.lay_num):
                        output = self.layers[l].forward(x)
                        x = output
                    loss += cross_entropy(output, y)
                    if np.argmax(output) == np.argmax(y):
                        acc += 1
                        total_acc += 1
                    # backward pass
                    dy = y
                    for l in range(self.lay_num-1, -1, -1):
                        dout = self.layers[l].backward(dy)
                        dy = dout
                # time
                end_time = time()
                batch_time = end_time-start_time
                remain_time = (training_data.shape[0]*epoch-batch_index-training_data.shape[0]*e)/batch_size*batch_time
                hrs = int(remain_time)/3600
                mins = int((remain_time/60-hrs*60))
                secs = int(remain_time-mins*60-hrs*3600)
                # result
                loss /= batch_size
                batch_acc = float(acc)/float(batch_size)
                training_acc = float(total_acc)/float((batch_index+batch_size)*(e+1))
                print('=== Epoch: {0:d}/{1:d} === Iter:{2:d} === Loss: {3:.2f} === BAcc: {4:.2f} === TAcc: {5:.2f} === Remain: {6:d} Hrs {7:d} Mins {8:d} Secs ==='.format(e,epoch,batch_index+batch_size,loss,batch_acc,training_acc,int(hrs),int(mins),int(secs)))
        # dump weights and bias
        obj = []
        for i in range(self.lay_num):
            cache = self.layers[i].extract()
            obj.append(cache)
        with open(weights_file, 'wb') as handle:
            pickle.dump(obj, handle, protocol=pickle.HIGHEST_PROTOCOL)


    def test(self, data, label, test_size):
        toolbar_width = 40
        sys.stdout.write("[%s]" % (" " * (toolbar_width-1)))
        sys.stdout.flush()
        sys.stdout.write("\b" * (toolbar_width))
        step = float(test_size)/float(toolbar_width)
        st = 1
        total_acc = 0
        for i in range(test_size):
            if i == round(step):
                step += float(test_size)/float(toolbar_width)
                st += 1
                sys.stdout.write(".")
                #sys.stdout.write("%s]a"%(" "*(toolbar_width-st)))
                #sys.stdout.write("\b" * (toolbar_width-st+2))
                sys.stdout.flush()
            x = data[i]
            y = label[i]
            for l in range(self.lay_num):
                output = self.layers[l].forward(x)
                x = output
            if np.argmax(output) == np.argmax(y):
                total_acc += 1
        sys.stdout.write("\n")
        print('=== Test Size:{0:d} === Test Acc:{1:.2f} ==='.format(test_size, float(total_acc)/float(test_size)))


    def test_with_pretrained_weights(self, data, label, test_size, weights_file):
        with open(weights_file, 'rb') as handle:
            b = pickle.load(handle)
        self.layers[0].feed(b[0]['conv1.weights'], b[0]['conv1.bias'])
        self.layers[3].feed(b[3]['conv3.weights'], b[3]['conv3.bias'])
        self.layers[6].feed(b[6]['conv5.weights'], b[6]['conv5.bias'])
        self.layers[9].feed(b[9]['fc6.weights'], b[9]['fc6.bias'])
        self.layers[11].feed(b[11]['fc7.weights'], b[11]['fc7.bias'])
        toolbar_width = 40
        sys.stdout.write("[%s]" % (" " * (toolbar_width-1)))
        sys.stdout.flush()
        sys.stdout.write("\b" * (toolbar_width))
        step = float(test_size)/float(toolbar_width)
        st = 1
        total_acc = 0
        for i in range(test_size):
            if i == round(step):
                step += float(test_size)/float(toolbar_width)
                st += 1
                sys.stdout.write(".")
                #sys.stdout.write("%s]a"%(" "*(toolbar_width-st)))
                #sys.stdout.write("\b" * (toolbar_width-st+2))
                sys.stdout.flush()
            x = data[i]
            y = label[i]
            for l in range(self.lay_num):
                output = self.layers[l].forward(x)
                x = output
            if np.argmax(output) == np.argmax(y):
                total_acc += 1
        sys.stdout.write("\n")
        print('=== Test Size:{0:d} === Test Acc:{1:.2f} ==='.format(test_size, float(total_acc)/float(test_size)))
        
    def predict_with_pretrained_weights(self, inputs, weights_file):
        with open(weights_file, 'rb') as handle:
            b = pickle.load(handle)
        self.layers[0].feed(b[0]['conv1.weights'], b[0]['conv1.bias'])
        self.layers[3].feed(b[3]['conv3.weights'], b[3]['conv3.bias'])
        self.layers[6].feed(b[6]['conv5.weights'], b[6]['conv5.bias'])
        self.layers[9].feed(b[9]['fc6.weights'], b[9]['fc6.bias'])
        self.layers[11].feed(b[11]['fc7.weights'], b[11]['fc7.bias'])
        for l in range(self.lay_num):
            output = self.layers[l].forward(inputs)
            inputs = output
        digit = np.argmax(output)
        probability = output[0, digit]
        return digit, probability


       


In [27]:
from model.network import Net
from flask import Flask, jsonify, render_template, request
from preprocessing import *

app = Flask(__name__)
@app.route('/')
def index():
    return render_template('index.html')
@app.route('/digit_prediction', methods=['POST'])

def digit_prediction():
    if(request.method == "POST"):
        img = request.get_json()
        img = preprocess(img)
        net = Net()
        digit, probability = net.predict_with_pretrained_weights(img, 'pretrained_weights.pkl')
        data = { "digit":digit, "probability":float(int(probability*100))/100. }
        return jsonify(data)

if __name__ == "__main__":
    app.run(debug=True)

 * Serving Flask app "__main__" (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: on


INFO:werkzeug: * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)
INFO:werkzeug: * Restarting with stat


SystemExit: ignored

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [29]:
from keras.datasets import mnist
from keras.utils import np_utils
import numpy as np
import mnist
from model.network import Net

print('Loadind data......')
num_classes = 10
train_images = mnist.train_images() #[60000, 28, 28]
train_labels = mnist.train_labels()
test_images = mnist.test_images()
test_labels = mnist.test_labels()

print('Preparing data......')
train_images -= int(np.mean(train_images))
train_images /= int(np.std(train_images))
test_images -= int(np.mean(test_images))
test_images /= int(np.std(test_images))
training_data = train_images.reshape(60000, 1, 28, 28)
training_labels = np.eye(num_classes)[train_labels]
testing_data = test_images.reshape(10000, 1, 28, 28)
testing_labels = np.eye(num_classes)[test_labels]

net = Net()
print('Training Lenet......')
net.train(training_data, training_labels, 32, 1, 'weights.pkl')
print('Testing Lenet......')
net.test(testing_data, testing_labels, 100)
print('Testing with pretrained weights......')
net.test_with_pretrained_weights(testing_data, testing_labels, 100, 'pretrained_weights.pkl')

ModuleNotFoundError: ignored