In [1]:
import numpy as np

class ConvolutionLayer:
  # A Convolution layer
    def __init__(self, num_filters, inputs_channel, width, height, stride, padding, learning_rate, name):
        """
        num_filters:     卷积核个数
        inputs_channel:  通道个数？ RGB吗？
        width：          卷积核宽
        height：         卷积核高
        stride：         卷积核步长
        padding：        输入填充宽度
        learning_rate:  学习率
        name:           卷积层名字
        """
        self.num_filters = num_filters
        self.channel = inputs_channel
        self.width = width
        self.height = height
        self.stride = stride
        self.padding = padding
        self.name = name
        self.lr = learning_rate

        # 所有卷积参数构成一个4维矩阵， (num_filters, channel, width, height)，
        # 每个 filter是一个三位矩阵 （channel, width, height)
        # 参数随机初始化，除length*width减小方差

        self.weights = np.zeros((self.num_filters, self.channel, self.width, self.height))
        #np.zero() 全填充为零
        self.bias = np.zeros((self.num_filters,1))
        for i in range(self.num_filters):
            # 初始化 num_filters个filter matrices
            self.weights[i,:,:,:] = np.random.normal(loc=0, scale=np.sqrt(1./(self.channel*self.width*self.height)), size=(self.channel, self.width, self.height))


    def zero_padding(self, inputs, padding_size):
        #array.shape is one function of np
        w, h = inputs.shape[0], inputs.shape[1]
        new_w = 2 * padding_size + w
        new_h = 2 * padding_size + h
        out = np.zeros((new_w, new_h))
        # out[2:4] means out[2] and out[3]
        out[padding_size:w+padding_size, padding_size:h+padding_size] = inputs
        # this step means copy the orignial values in the new matrix
        return out

    def forward(self, inputs):
        # input size: (C, W, H)
        # output size: (F ,WW, HH)
        C = inputs.shape[0]
        W = inputs.shape[1]+2*self.padding
        H = inputs.shape[2]+2*self.padding
        self.inputs = np.zeros((C, W, H))
        for c in range(inputs.shape[0]):
            #add the input padding(this process we can remove)
            self.inputs[c,:,:] = self.zero_padding(inputs[c,:,:], self.padding)
        WW = (W - self.width)//self.stride + 1
        HH = (H - self.height)//self.stride + 1
        feature_maps = np.zeros((self.num_filters, WW, HH))
        for f in range(self.num_filters):
            for w in range(WW):
                for h in range(HH):
                    ###np.sum(add)
                    feature_maps[f,w,h]=np.sum(self.inputs[:,w:w+self.width,h:h+self.height]*self.weights[f,:,:,:])+self.bias[f]

        return feature_maps

    def backward(self, dy):

        C, W, H = self.inputs.shape
        dx = np.zeros(self.inputs.shape)
        dw = np.zeros(self.weights.shape)
        db = np.zeros(self.bias.shape)

        F, W, H = dy.shape
        for f in range(F):
            for w in range(W):
                for h in range(H):
                    #dw[f,:,:,:]means the fth 3D matrix
                    dw[f,:,:,:]+=dy[f,w,h]*self.inputs[:,w:w+self.width,h:h+self.height]
                    dx[:,w:w+self.width,h:h+self.height]+=dy[f,w,h]*self.weights[f,:,:,:]

        for f in range(F):
            db[f] = np.sum(dy[f, :, :])

        self.weights -= self.lr * dw
        self.bias -= self.lr * db
        return dx

    def extract(self):
        return {self.name+'.weights':self.weights, self.name+'.bias':self.bias}

    def feed(self, weights, bias):
        self.weights = weights
        self.bias = bias

        

In [2]:
class ReLu:
    def __init__(self):
        pass
    def forward(self, inputs):
        self.inputs = inputs
        ret = inputs.copy()
        # change the value that is less than 0 to 0
        ret[ret < 0] = 0
        return ret

    def backward(self, dy):
        dx = dy.copy()
        dx[self.inputs < 0] = 0
        return dx
    def extract(self):
        return

In [3]:
import numpy as np
class MaxPoolingLayer:
    # A Max Pooling layer .
    def __init__(self, width, height, stride, name):
        self.width = width
        self.height = height
        self.stride = stride
        self.name = name

    def forward(self, inputs):
        self.inputs = inputs
        C, W, H = inputs.shape
        new_width = (W - self.width) // self.stride + 1
        new_height = (H - self.height) // self.stride + 1
        out = np.zeros((C, new_width, new_height))
        for c in range(C):
            for w in range(new_width):
                for h in range(new_height):
                    #np.max get the max value
                    out[c, w, h] = np.max(
                        self.inputs[c, w * self.stride:w * self.stride + self.width, h * self.stride:h * self.stride + self.height])
        return out

    def backward(self, dy):
        C, W, H = self.inputs.shape
        dx = np.zeros(self.inputs.shape)

        for c in range(C):
            for w in range(0, W, self.width):
                for h in range(0, H, self.height):
                    #np.argmax get the index of max value , and the index is the index after flattening the array
                    st = np.argmax(self.inputs[c, w:w + self.width, h:h + self.height])
                    # get the original index of max value from the flattened index
                    (idx, idy) = np.unravel_index(st, (self.width, self.height))
                    dx[c, w + idx, h + idy] = dy[c, w // self.width, h // self.height]
        return dx

    def extract(self):
        return

In [4]:
import numpy as np

# loss
def cross_entropy(inputs, labels):

    out_num = labels.shape[0]
    p = np.sum(labels.reshape(1,out_num)*inputs)
    loss = -np.log(p)
    return loss

In [5]:
class Flatten:
    def __init__(self):
        pass
    def forward(self, inputs):
        self.C, self.W, self.H = inputs.shape
        #array.reshape(element, length, order), this step means changing the 3D matrix to 1D
        return inputs.reshape(1, self.C*self.W*self.H)
    def backward(self, dy):
        return dy.reshape(self.C, self.W, self.H)
    def extract(self):
        return

In [6]:
import numpy as np
class FullyConnectedLayer:

    def __init__(self, num_inputs, num_outputs, learning_rate, name):
        self.weights = 0.01*np.random.rand(num_inputs, num_outputs)
        self.bias = np.zeros((num_outputs, 1))
        self.lr = learning_rate
        self.name = name

    def forward(self, inputs):
        self.inputs = inputs
        return np.dot(self.inputs, self.weights) + self.bias.T

    def backward(self, dy):

        if dy.shape[0] == self.inputs.shape[0]:
            dy = dy.T
        dw = dy.dot(self.inputs)
        db = np.sum(dy, axis=1, keepdims=True)
        dx = np.dot(dy.T, self.weights.T)

        self.weights -= self.lr * dw.T
        self.bias -= self.lr * db

        return dx

    def extract(self):
        return {self.name+'.weights':self.weights, self.name+'.bias':self.bias}

    def feed(self, weights, bias):
        self.weights = weights
        self.bias = bias

In [7]:
import numpy as np
class Softmax:
    def __init__(self):
        pass
    def forward(self, inputs):
        exp = np.exp(inputs, dtype=np.float)
        self.out = exp/np.sum(exp)
        return self.out
    def backward(self, dy):
        return self.out.T - dy.reshape(dy.shape[0],1)
    def extract(self):
        return


In [8]:
import numpy as np
import pickle
import sys
import time


class Net:
    def __init__(self):

        lr = 0.01
        self.layers = []
        self.layers.append(
            ConvolutionLayer(inputs_channel=1, num_filters=6, width=5, height=5, padding=2, stride=1, learning_rate=lr,
                          name='conv1'))
        self.layers.append(ReLu())
        self.layers.append(MaxPoolingLayer(width=2, height=2, stride=2, name='maxpool2'))
        
        self.layers.append(
            ConvolutionLayer(inputs_channel=6, num_filters=16, width=5, height=5, padding=0, stride=1, learning_rate=lr,
                          name='conv3'))
        self.layers.append(ReLu())
        self.layers.append(MaxPoolingLayer(width=2, height=2, stride=2, name='maxpool4'))
        
        self.layers.append(
             ConvolutionLayer(inputs_channel=16, num_filters=120, width=5, height=5, padding=0, stride=1, learning_rate=lr,
                           name='conv5'))
    
        self.layers.append(ReLu())
        self.layers.append(Flatten())
        self.layers.append(FullyConnectedLayer(num_inputs=120, num_outputs=60, learning_rate=lr, name='fc6'))
        self.layers.append(ReLu())
        self.layers.append(FullyConnectedLayer
                           (num_inputs=60, num_outputs=2, learning_rate=lr, name='fc7'))
        self.layers.append(Softmax())
        self.lay_num = len(self.layers)

    def train(self, training_data, training_label, batch_size, epoch):
        total_acc = 0
        for e in range(epoch):
            for batch_index in range(0, training_data.shape[0], batch_size):
                # batch input
                if batch_index + batch_size < training_data.shape[0]:
                    data = training_data[batch_index:batch_index + batch_size]
                    label = training_label[batch_index:batch_index + batch_size]
                else:
                    data = training_data[batch_index:training_data.shape[0]]
                    label = training_label[batch_index:training_label.shape[0]]
                loss = 0
                acc = 0
                start_time = time.time()
                
                for b in range(len(data)):
                    x = data[b]
                    y = label[b]
                   # print(y)
                    # forward pass
                    for l in range(self.lay_num):
                        output = self.layers[l].forward(x)
                        x = output
                    loss += cross_entropy(output, y)
                    if np.argmax(output) == np.argmax(y):
                        acc += 1
                        total_acc += 1
                    # backward pass
                    dy = y
                    for l in range(self.lay_num - 1, -1, -1):
                        dout = self.layers[l].backward(dy)
                        dy = dout
                # time
                end_time = time.time()
                batch_time = end_time - start_time
                remain_time = (training_data.shape[0] * epoch - batch_index - training_data.shape[
                    0] * e) / batch_size * batch_time
                hrs = int(remain_time) / 3600
                mins = int((remain_time / 60 - hrs * 60))
                secs = int(remain_time - mins * 60 - hrs * 3600)
                # result
                loss /= batch_size
                batch_acc = float(acc) / float(batch_size)
                training_acc = float(total_acc) / float((batch_index + batch_size) * (e + 1))
 

    def test(self, data, label, test_size):
        toolbar_width = 40
        #sys.stdout.write("[%s]" % (" " * (toolbar_width - 1)))
        #sys.stdout.flush()
       # sys.stdout.write("\b" * (toolbar_width))
        step = float(test_size) / float(toolbar_width)
        st = 1
        total_acc = 0
        for i in range(test_size):
            if i == round(step):
                step += float(test_size) / float(toolbar_width)
                st += 1
                #sys.stdout.write(".")
                # sys.stdout.write("%s]a"%(" "*(toolbar_width-st)))
                # sys.stdout.write("\b" * (toolbar_width-st+2))
                #sys.stdout.flush()
            x = data[i]
            y = label[i]
            #print(y)
            for l in range(self.lay_num):
                output = self.layers[l].forward(x)
                x = output
            if np.argmax(output) == np.argmax(y):
                total_acc += 1
        #sys.stdout.write("\n")
        return  float(total_acc) / float(test_size)

   

In [None]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Nov 17 15:03:57 2020

@author: MoHan
"""

# libraries needed
import os

import math

import matplotlib.pyplot as plt
import numpy 

# note: if tensorflow is not install, run "pip install --upgrade tensorflow"
# from tensorflow.keras.preprocessing.image import ImageDataGenerator
import cv2
from PIL import Image
import random

test_dir = "./dataset/test_set"
train_dir = "./dataset/training_set"

train_dir_cats = train_dir + "/cats"
train_dir_dogs = train_dir + "/dogs"
test_dir_cats = test_dir + "/cats"
test_dir_dogs = test_dir + "/dogs"

train_data = []
train_data_label = []
test_data = []
test_data_label = []

# Only transformed to gray pic
def normal_transform (imgpath):
    img = cv2.imread(imgpath)
    img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    img = cv2.resize(img, (28,28))
    return Image.fromarray(img)

# Preprocessed using gaussian_canny
def gaussian_canny_transform (imgpath):
    img = cv2.imread(imgpath)
    img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    gaussian = cv2.GaussianBlur(img, (3,3), 0)
    gaussian = gaussian.astype(numpy.uint8)
    canny = cv2.Canny(gaussian, 50, 50)
    canny = cv2.resize(canny, (28,28))
    return Image.fromarray(canny)

# Preprocessed using sobel
def sobel_transform (imgpath):
    img = cv2.imread(imgpath)
    img = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
    height, weight = img.shape
    sobel = numpy.zeros((height, weight, 1), numpy.uint8)
    for i in range(0,height-2):
        for j in range(0,weight-2):
            gy=img[i,j]*1+img[i,j+1]*2+img[i,j+2]*1-img[i+2,j]-2*img[i+2,j+1]-img[i+2,j+2]*1
            gx=img[i,j]*1-img[i,j+2]+img[i+1,j]*2-2*img[i+1,j+2]+img[i+2,j]-img[i+2,j+2]
            grad=math.sqrt(gx*gx+gy*gy)
            if grad>50:
                sobel[i,j]=255
            else:
                sobel[i,j]=0
    print(sobel)
    # return Image.fromarray(numpy.uint8(sobel))

# Reading training data
def read_training_data(train_data, train_data_label, dir, label):
    for filename in os.listdir(dir):
        imgpath = dir + "/" + filename
        img = normal_transform(imgpath)
        train_data.append([numpy.asarray(img)])
        train_data_label.append((label))

# Reading testing data
def read_testing_data(test_data, test_data_label, dir, label):
    for filename in os.listdir(dir):
        imgpath = dir + "/" + filename
        img = normal_transform(imgpath)
        test_data.append([numpy.asarray(img)])
        test_data_label.append((label))

#read gray images into train_data and train_data_label
read_training_data(train_data, train_data_label, train_dir_cats, [0,1])
#train_data =train_data[0:250]
#train_data_label =train_data_label[0:250]
read_training_data(train_data, train_data_label, train_dir_dogs, [1,0])
#train_data =train_data[0:500]
#train_data_label =train_data_label[0:500]
for i in range(0,len(train_data)//2,2):
               tmp=train_data[i]
               train_data[i] =train_data[len(train_data)-1-i]
               train_data[len(train_data)-1-i] =tmp
               #print(train_data_label[i])
               tlabel = train_data_label[i]
               train_data_label[i] = train_data_label[len(train_data)-1-i]
               train_data_label[len(train_data)-1-i] =  tlabel
               #print(train_data_label[i])
               



read_testing_data(test_data, test_data_label, test_dir_cats, [0,1])

read_testing_data(test_data, test_data_label, test_dir_dogs, [1,0])
for i in range(0,len(test_data)//2,3):
               tmp = test_data[i]
               test_data[i] =test_data[len(test_data)-1-i]
               test_data[len(test_data)-1-i] =tmp
               tlabel = test_data_label[i]
               test_data_label[i] = test_data_label[len(test_data)-1-i]
               test_data_label[len(test_data)-1-i] =  tlabel
               i=i+1
train_data = train_data[0:500]
train_data_label=train_data_label[0:500]
test_data=test_data[1000:1200]
test_data_label =test_data_label[1000:1200]

train_data = numpy.array(train_data)

test_data = numpy.array(test_data)

train_data_label = numpy.array(train_data_label)

test_data_label = numpy.array(test_data_label)



LeNet = Net()

#print('Training Lenet......')
LeNet.train(training_data=train_data,training_label=train_data_label,batch_size=20,epoch=1)

#print('Testing Lenet......')
accuracy = LeNet.test(data=test_data,label=test_data_label,test_size=200)
#print('Testing accuracy:'+str(accuracy))


In [None]:
#Write result into txt
f = open("CNN_result.txt", "w")
f.write('Testing accuracy:'+str(accuracy*100)+'%')
f.close()