In [1]:
import h5py
import numpy as np
import math
import matplotlib.pyplot as plt
filename = "MNISTdata.hdf5"
myfile = h5py.File(filename, 'r') 
X_train=np.array(myfile["x_train"])
X_train = X_train.reshape((X_train.shape[0],28,28))
Y_train=np.array(myfile['y_train'])
X_test=np.array(myfile['x_test'])
X_test = X_test.reshape((X_test.shape[0],28,28))
Y_test=np.array(myfile['y_test'])
myfile.close()
Y_train_1=Y_train.flatten()

In [2]:
def convert_one_hot_matrix(labels):
    labels=labels.flatten()
    c=int(max(labels)+1)
    b=labels.shape[0]
    a=np.zeros((c,b))
    a[labels,np.arange(b)]=1.
    return a
Y_train=convert_one_hot_matrix(Y_train)
Y_train.shape

(10, 60000)

In [41]:
from itertools import product
class convolution_net:
    def __init__(self,fw,fh,N_filter,X_train,Y_train):
        self.N_filter=N_filter
        self.fh=fh
        self.fw=fw
        self.Wc=np.random.randn(self.fh,self.fw,self.N_filter)*0.01
        self.dWc=np.zeros((self.fh,self.fw,self.N_filter))
        _,self.Nh,self.Nw=X_train.shape
        self.n_H=self.Nh-self.fh+1
        self.n_W=self.Nw-self.fw+1
        
        self.W1=0.01*np.random.randn(10,self.n_H*self.n_W*self.N_filter)
        self.dW1=np.zeros((10,self.n_H*self.n_W*self.N_filter))
        self.b1=0.01*np.random.randn(10,1)
        self.db1=np.zeros((10,1))
        #self.b2=np.zeros((10,1))
        #self.db2=np.zeros((10,1))
        self.X=X_train
        self.Y=Y_train
        self.costs = []
        
    
    
    def convolve(self,X,K):
        dim1 = X.shape[0]-K.shape[0]+1
        dim2 = X.shape[1]-K.shape[1]+1
        iters = product(range(dim1),range(dim2))
        results = np.zeros((dim1,dim2))
        for i,j in iters:
            results[i,j] = np.sum(X[i:i+K.shape[0],j:j+K.shape[1]]*K[:,:])
        return results
    
    def forward_propagation(self,X_train):
        self.m=X_train.shape[0]
        self.Z_out=np.zeros((self.m,self.n_H,self.n_W,self.N_filter))
        for i in range(self.m):
            for j in range(self.N_filter):
                self.Z_out[i,:,:,j]=self.convolve(X_train[i,:,:],self.Wc[:,:,j])
        A_0=np.where(self.Z_out>0, self.Z_out,0)
        self.A_0=A_0.reshape((self.m, self.n_H*self.n_W*self.N_filter))
        Z_1=np.dot(self.W1,self.A_0.T)+self.b1
        #self.A_1=np.where(Z_1>0,Z_1,0.)
        #Z_2=np.dot(self.W2,self.A_1)+self.b2
        self.A_1=np.exp(Z_1)/np.sum(np.exp(Z_1),axis=0,keepdims=True)
        assert self.A_1.shape==(10,self.m)
        return self.A_1
    
    def compute_cost(self,Y_train):
        cost=-1./self.m*np.sum(Y_train*np.log(self.A_1))
        assert (Y_train.shape[0] == self.A_1.shape[0])
        return cost
    
    def backward_propagation(self,Y_train,X_train):
        dZ_1=self.A_1-Y_train
        #self.dW2 = np.dot(dZ_2,self.A_1.T)
        #self.db2= np.sum(dZ_2,axis=1,keepdims=True)
        #dA_prev = np.dot(self.W2.T,dZ_2)
        #dZ_1=np.where(self.A_1> 0, 1., 0.)*dA_prev
        self.dW1 = np.dot(dZ_1,self.A_0)
        self.db1 = np.sum(dZ_1,axis=1,keepdims=True)
        dA_0=np.dot(self.W1.T,dZ_1).T
        dA_0=dA_0.reshape((self.m,self.n_H,self.n_W,self.N_filter))
        self.A_0=self.A_0.reshape((self.m,self.n_H,self.n_W,self.N_filter))
        dZ_out=np.where(self.A_0> 0, 1., 0.)*dA_0
        dWc = np.zeros((self.m,self.fh,self.fw,self.N_filter))
        for i in range(self.m):  
            a_prev_pad=X_train[i,:,:]             
            for j in range(self.N_filter):           
                    dWc[i,:,:,j] = self.convolve(a_prev_pad,dZ_out[i,:,:,j])
                    # self.dbc[:,:,j] += np.sum(dZ_out[i,:,:,j])
        self.dWc = np.sum(dWc, axis=0)
        return 1
    
    def update_parameters(self,learning_rate):
        self.W1= self.W1-learning_rate*self.dW1
        self.b1= self.b1-learning_rate*self.db1
        #self.W2= self.W2-learning_rate*self.dW2
        #self.b2= self.b2-learning_rate*self.db2
        self.Wc=self.Wc-learning_rate*self.dWc
        #self.bc=self.bc-learning_rate*self.dbc
        return 1

In [42]:
def random_mini_batches(X, Y, mini_batch_size = 64):
    m = X.shape[0]                  # number of training examples
    mini_batches = []
        
    # Step 1: Shuffle (X, Y)
    permutation = list(np.random.permutation(m))
    shuffled_X = X[permutation,:,:]
    shuffled_Y = Y[:,permutation].reshape((10,m))

    # Step 2: Partition (shuffled_X, shuffled_Y). Minus the end case.
    num_complete_minibatches = math.floor(m/mini_batch_size) # number of mini batches of size mini_batch_size in your partitionning
    index_array=[i*mini_batch_size for i in range(num_complete_minibatches)]
    index_array.append(m)
    for k in range(0, num_complete_minibatches):
        mini_batch_X = shuffled_X[index_array[k]:index_array[k+1],:,:]
        mini_batch_Y = shuffled_Y[:,index_array[k]:index_array[k+1]]
        mini_batch = (mini_batch_X, mini_batch_Y)
        mini_batches.append(mini_batch)
    
    return mini_batches

In [43]:
def initialize(X_train,Y_train):
    
    mynet=convolution_net(4,4,3,X_train, Y_train)
    return mynet

def train(network,learning_rate = 0.001,
          num_epochs = 20, mini_batch_size = 256, print_cost = True):
    # Optimization loop
    X_train = network.X
    Y_train = network.Y
    minibatches = random_mini_batches(X_train, Y_train, mini_batch_size)
    for i in range(num_epochs): 
        #alpha=1./np.sqrt(num_epochs)*learning_rate
        cost=0
        for minibatch in minibatches:

            # Select a minibatch
            (minibatch_X, minibatch_Y) = minibatch

            # Forward propagation
            a3= mynet.forward_propagation(minibatch_X)

            # Compute cost
            cost += mynet.compute_cost(minibatch_Y)/len(minibatches)

            # Backward propagation
            mynet.backward_propagation(minibatch_Y,minibatch_X)
            
            mynet.update_parameters(learning_rate)
        mynet.costs.append(cost)
        if print_cost and (i % 2 == 0 or i==num_epochs-1):
            print ("Cost after epoch %i: %f" %(i, cost))
                
    # plot the cost
    plt.plot(mynet.costs)
    plt.ylabel('cost')
    plt.xlabel('epochs (per 10)')
    plt.title("Learning rate = " + str(learning_rate))
    plt.show()

    return mynet

In [44]:
mynet = initialize(X_train,Y_train)

In [45]:
train(mynet, print_cost = True,num_epochs=10,mini_batch_size =60)

Cost after epoch 0: 0.466694


KeyboardInterrupt: 

In [40]:
mypredict_test = mynet.forward_propagation(X_test)
mypredict_test = np.argmax(mypredict_test.T,axis=1)
print (mypredict_test)
accu = np.sum(1.*(mypredict_test == Y_test.flatten()) )/mypredict_test.shape[0]
print ("test accuracy: "+str(accu*100.0)+"%")

[7 2 1 ... 4 5 6]
test accuracy: 97.16%


In [20]:
mypredict_train = mynet.forward_propagation(X_train)
mypredict_train = np.argmax(mypredict_train.T,axis=1)
print (mypredict_train)
accu = np.sum(1.*(mypredict_train == Y_train_1) )/mypredict_train.shape[0]
print("train accuracy: "+str(accu*100.0)+"%")

[3 0 4 ... 5 6 8]
train accuracy: 98.06%
