# Traffic Signals Detection Model Preparation

In [1]:
import numpy as np
import os
from matplotlib import pyplot as plt
import cv2
import math
import pandas as pd
import pickle
import time
from tqdm.notebook import tnrange
from IPython.display import clear_output
import logging
logging.basicConfig(format='%(asctime)s - %(message)s', level=logging.INFO)
from keras.preprocessing.image import ImageDataGenerator
from sklearn.model_selection import train_test_split

In [2]:
epochs = 50
size = 50
ReqLabels = [1,3,7]#,14]#,17,33,34,36,37]

Train_Path = 'D:/Data/Traffic Signs/Train.csv'
Test_Path = 'D:/Data/Traffic Signs/Test.csv'
Meta_Path = 'D:/Data/Traffic Signs/Meta.csv'
Labels_Path = 'D:/Data/Traffic Signs/Labels.csv'

Labels = []
labelData = pd.read_csv(Labels_Path)
for i,j in zip(labelData['ClassId'] , labelData['SignName']) :
    if i in ReqLabels :
        Labels.append(j)
        
print(Labels)

['Speed limit (30kmph)', 'Speed limit (60kmph)', 'Speed limit (100kmph)']


In [3]:
data = pd.read_csv(Meta_Path)
NoOfLabels = len(data['Path'])
print(NoOfLabels)

43


In [4]:
Count_Labels = {}
for i in ReqLabels :
    Count_Labels[i] = 0

Count_Labels

{1: 0, 3: 0, 7: 0}

In [5]:
loc = 'D:/CNN_1.0/train_test_6.0_Size32_fixed.obj'
f = open(loc , 'rb')
X_train , Y_train , Labels = pickle.load(f)

X_train, X_test, Y_train, Y_test = train_test_split(X_train, Y_train, 
                                                    test_size=0.3,
                                                    shuffle=True,
                                                    random_state=42)

print(f"""X_train shape : {X_train.shape}, Y_train shape : {Y_train.shape}
X_test shape : {X_test.shape}, Y_test shape : {Y_test.shape}""")

X_train shape : (2762, 32, 32, 3), Y_train shape : (2762,)
X_test shape : (1185, 32, 32, 3), Y_test shape : (1185,)


In [6]:
fileName = 'Model_Conv_2.2.h5'

In [7]:
def callback(obj,fileName) :
    print('CallBack : ',fileName,'Updated')
    f = open(fileName, 'wb') 
    pickle.dump(obj, f)

In [8]:
print(f'X_train shape : {X_train.shape}, Y_train shape : {Y_train.shape}\nX_test shape : {X_test.shape}, Y_test shape : {Y_test.shape}')

X_train shape : (2762, 32, 32, 3), Y_train shape : (2762,)
X_test shape : (1185, 32, 32, 3), Y_test shape : (1185,)


In [9]:
set(Y_train)

{0, 1, 2, 3, 4, 5}

In [10]:
Labels

['Speed limit (30kmph)',
 'Speed limit (60kmph)',
 'Stop',
 'Turn right ahead',
 'Turn left ahead',
 'Negative']

### Activations

In [11]:
class ReLU :
    def __init__ (self) :
        self.__type__ = 'activation'
        self.__Name__ = 'ReLU'
    
    def feed(self,X) :
        return np.maximum(0,X)
    
    def feed_back(self,Z,grad_output,lr) :
        i = 0
        for z in Z :
            try :
                grad = z
                grad[z < 0] = 0
                grad[z > 0] = 1
                grad_output[i] = grad_output[i]*grad
                i += 1
            except Exception :
                return grad_output
        return grad_output
    
    def predict(self,Y) :
        output = np.maximum(0,Y)
        return output
    
class TanH :
    def __init__(self) :
        self.__type__ = 'activation'
        self.__Name__ = 'TanH'
    def feed(self,X) :
        outputs = []
        for x in X :
            output = np.tanh(x)
            outputs.append(output)
        return outputs
        
    def feed_back(self,Z,grad_output,lr) :
        outs = []
        for z in Z :
            outs.append(1-np.power(np.tanh(z),2))
        return np.array(outs)
    
    def predict(self,Y) :
        output = np.tanh(Y)
        return output
        
class Softmax :
    def __init__ (self) :
        self.__type__ = 'activation'
        self.__Name__ = 'Softmax'
    
    def feed(self,X) :
        outputs = []
        for x in X :
            e_x = np.exp(x - np.max(x,
                                    axis=1,
                                    keepdims=True))
            output = e_x/np.sum(e_x,
                                axis=1,
                                keepdims=True)
            outputs.append(output)
        return np.array(outputs)
    
    def grad_feed(self,X) :
        e_x = np.exp(X - np.max(X,
                                axis=1,
                                keepdims=True))
        out = (e_x/e_x.sum()) - ((e_x*e_x)/np.power(e_x.sum(),2))
        return out
    
    def feed_back(self,Z,grad_output,lr) :
        e_x = np.exp(Z - np.max(Z))
        out = e_x/np.sum(e_x)
        softmax = np.reshape(out, (1, -1))
        grad_output = np.reshape(grad_output, (1, -1))
        grad = (softmax * np.identity(softmax.size) - softmax.transpose() @ softmax)
        return grad_output*grad
    
    def predict(self,Y) :
        e_x = np.exp(Y-np.max(Y))
        output = e_x/np.sum(e_x)
        return output
    
class Sigmoid :
    def __init__ (self) :
        self.__type__ = 'activation'
        self.__Name__ = 'Sigmoid'
    
    def feed(self,X) :
        outputs = []
        for x in X :
            output = 1/(1+np.exp(-x))
            outputs.append(output)
        return outputs
    
    def grad_feed(self,X) :
        out = 1/(1+np.exp(-X))
        return out*(1-out)
    
    def feed_back(self,Z,grad_output,lr) :
        out = 1/(1+np.exp(-Z))
        grad = out*((1-out)**2)
        return grad_output*grad

### Layers

In [12]:
class Conv2D :
    
    def __init__ (self,N_F,K_S,A_F,input_shape,STRIDES=1,pad=0) :
        self.__Name__ = 'Conv2D'
        self.__type__ = 'conv'
        self.N_F = N_F
        self.K_S = K_S
        self.STRIDES = STRIDES
        self.A_F = A_F
        self.pad = pad
        if (len(input_shape) == 2) :
            input_shape = input_shape+(1,)
        if input_shape[0] is not None :
            self.input_shape = (None,) + input_shape
        else :
            self.input_shape = input_shape
        #self.weights = np.random.normal(size=(self.N_F,self.input_shape[-1],self.K_S,self.K_S))
        self.weights = np.random.randint(-1,2,(self.N_F,self.input_shape[-1],self.K_S,self.K_S))
        self.W = int((self.input_shape[1]-self.K_S+2*self.pad)/self.STRIDES) + 1
        self.H = int((self.input_shape[2]-self.K_S+2*self.pad)/self.STRIDES) + 1
        self.D = self.N_F
        self.bias = np.zeros((self.N_F))
        self.output_shape = (None,) + (self.W,self.H,self.D)
        self.Batch_W = []
        self.Batch_B = []
        self.mdw = np.zeros((self.input_shape[1], self.output_shape[1])) 
        self.mdb = np.zeros((1,self.output_shape[1]))
        
        self.vdw = np.zeros((self.input_shape[1], self.output_shape[1])) 
        self.vdb = np.zeros((1,self.output_shape[1]))
        
    def feed(self,X) :
        outputs = []
        for x in X :
            if self.pad >0 :
                x_ = np.zeros(((x.shape[0]+self.pad*2),x.shape[1]+self.pad*2,x.shape[2]))
                x_[self.pad:-self.pad,self.pad:-self.pad] = x
                x = x_
            output = np.zeros((self.W,self.H,self.D))
            h = 0
            for i in range(0,self.input_shape[1],self.STRIDES) :
                w = 0
                for j in range(0,self.input_shape[2],self.STRIDES) :
                    cur_reg = x[i:i+self.K_S,j:j+self.K_S].T*self.weights
                    output[h,w] = np.sum(cur_reg,axis=(1,2,3)) + self.bias
                    w += 1
                h += 1
            outputs.append(output)
        return np.array(outputs)
    
    def feed_back(self, Z , output_error, learning_rate=[1e-02],decay=1e-03,opt='sgd',b1=.9,b2=.999) :
        outs = []
#         _,i_c,i_dim,__ = output_error.shape
#         lr = learning_rate[np.random.randint(len(learning_rate))]
#         for z,out_err in zip(Z,output_error) :
#             d_op = np.zeros(z.shape)
#             d_conv = np.zeros(self.weights.shape)
#             d_bias = np.zeros(self.bias.shape)
#             # dX[h:h+f, w:w+f] += W * dH(h,w)
#             for i in range(0,i_c) :
#                 for j in range(0,i_dim) :
#                     for c in range(len(out_err[i,j])) :
#                         d_conv[c] += (z[i:i+self.K_S,j:j+self.K_S] * out_err[i,j][c]).T
#             d_bias = np.sum(out_err,axis=(0,1))
#             self.weights = self.weights - lr * d_conv
#             self.bias = self.bias - lr * d_bias
# #             self.Batch_W.append(weights)
# #             self.Batch_B.append(bias)
# #             outs.append(d_op)
# #         self.weights = np.mean(self.Batch_W,axis=0)
# #         self.bias = np.mean(self.Batch_B,axis=0)
# #         self.Batch_W = []
# #         self.Batch_B = []
        
        return np.array(outs)

    def predict(self,Y) :
        output = np.zeros((self.W,self.H,self.D))
        h = 0
        if self.pad >0 :
            x_ = np.zeros(((Y.shape[0]+self.pad*2),Y.shape[1]+self.pad*2,Y.shape[2]))
            x_[self.pad:-self.pad,self.pad:-self.pad] = Y
            Y = x_
        for i in range(0,self.input_shape[1],self.STRIDES) :
            w = 0
            for j in range(0,self.input_shape[2],self.STRIDES) :
                cur_reg = Y[i:i+self.K_S,j:j+self.K_S].T*self.weights
                output[h,w] = np.sum(cur_reg,axis=(1,2,3)) + self.bias
                w += 1
            h += 1
        return output
    
    def plotImg(self,X=None,figSize=(5,5)) :
        if X is None :
            return
        for x in X :
            Filter_SIZE = int(x.shape[-1]**(1/2))
            _, axs = plt.subplots(Filter_SIZE,Filter_SIZE, figsize=figSize)
            axs = axs.flatten()
            for i , ax in enumerate(axs) :
                img = x[:,:,i]
                ax.axis('off')
                ax.imshow(img)
            plt.show()
        
    def Summary(self) :
        l = len(self.__Name__)
        print(f'{self.__Name__}',' '*(20-l),self.input_shape,' '*(20-len(str(self.input_shape))),self.output_shape)
        
# img = X_train[10]
# plt.imshow(img)
# plt.show()
# conv = Conv2D(16,3,'ReLU',input_shape=(img.shape[0],img.shape[1]),STRIDES=2,pad=1)

# conv.Summary()
# img = conv.feed(np.expand_dims(img,axis=0))
# conv.plotImg(img)

In [13]:
class MaxPool2D :
    
    def __init__ (self,K_S,input_shape,STRIDES=2,pad=0) :
        self.__Name__ = 'MaxPool2D'
        self.__type__ = 'pool'
        self.K_S = K_S
        self.STRIDES = STRIDES
        self.A_F = None
        self.pad = pad
        self.input_shape = input_shape
        self.output_shape = (None,) + (int((self.input_shape[1]-self.K_S+1)/self.STRIDES),int((self.input_shape[2]-self.K_S+1)/self.STRIDES),self.input_shape[-1])
        self.Batch_W = []
        self.Batch_B = []
        self.weights = np.zeros((self.input_shape[1:]))
        self.bias = np.zeros((self.output_shape[1:]))
    
    def feed(self,X) :
        outputs = []
        for x in X :
            if self.pad >0 :
                x_ = np.zeros(((x.shape[0]+self.pad*2),x.shape[1]+self.pad*2,x.shape[2]))
                x_[self.pad:-self.pad,self.pad:-self.pad] = x
                x = x_
            W,H,D = self.output_shape[1:]
            output = np.zeros((W,H,D))
            w , h = 0 , 0
            for j in range(0,self.input_shape[1]-self.K_S,self.STRIDES) :
                for i in range(0,self.input_shape[2]-self.K_S,self.STRIDES) :
                    output[w,h] = np.max(x[j:j+self.K_S,i:i+self.K_S].T,axis=(1,2))
                    h += 1
                h = 0
                w += 1
            outputs.append(output)
        return np.array(outputs)
    
    def create_mask_from_window(self,x):
        mask = x == np.max(x)
        return mask
    
    def feed_back(self, Z , output_error, learning_rate=[1e-03],decay=1e-03,opt='sgd') :
        outs = []
#         _,W,H,C = output_error.shape
#         for z,out_err in zip(Z,output_error) :
#             d_op = np.zeros(z.shape)
#             for x in range(0,W) :
#                 for y in range(0,H) :
#                     for c in range(C) :
#                         roi = z[x:x+self.K_S,y:y+self.K_S,c]
#                         mask = self.create_mask_from_window(roi)
#                         #update_y ,update_x = np.unravel_index(np.argmax(roi),roi.shape)
#                         d_op[x:x+self.K_S,y:y+self.K_S,c] = np.multiply(mask, out_err[x,y,c]) #output_error[out_y,out_x]
#             outs.append(d_op)
        return np.array(outs)
    
    def predict(self,Y) :
        W,H,D = self.output_shape[1:]
        output = np.zeros((W,H,D))
        w , h = 0 , 0
        for i in range(0,self.input_shape[1]-self.K_S,self.STRIDES) :
            for j in range(0,self.input_shape[2]-self.K_S,self.STRIDES) :
                output[w,h] = np.max(Y[j:j+self.K_S,i:i+self.K_S].T,axis=(1,2))
                h += 1
            h = 0
            w += 1
        return output
    
    def plotImg(self,X=None) :
        if X is None :
            return
        for x in X :
            Filter_SIZE = int(x.shape[-1]**(1/2))
            _, axs = plt.subplots(Filter_SIZE,Filter_SIZE, figsize=(8,8))
            axs = axs.flatten()
            for i , ax in enumerate(axs) :
                img = x[:,:,i]
                ax.axis('off')
                ax.imshow(img)
            plt.show()
        
    def Summary(self) :
        l = len(self.__Name__)
        print(f'{self.__Name__}',' '*(20-l),self.input_shape,' '*(20-len(str(self.input_shape))),self.output_shape)

In [14]:
class Flatten :
    
    """
        Flatten class is used to convert data into single dimension
    """
    
    def __init__ (self,input_shape=None,output_shape=None,__Name__='Flatten') : ### Constructor called when we create object 
        self.__Name__ = __Name__ ### Defining __Name__ variable with Flatten
        self.__type__ = 'flat' ### Defining __type__ variable
        if input_shape[0] is not None :
            self.input_shape = (None,) + input_shape
        else :
            self.input_shape = input_shape
        self.A_F = None
        re = 1
        if output_shape is None :
            for i in self.input_shape[1:] :
                re *= i
            self.output_shape = (None,re)
        else :
            self.output_shape = output_shape
        self.Batch_W = []
        self.Batch_B = []
        self.weights = np.zeros((self.input_shape[1:]))
        self.bias = np.zeros((self.output_shape[1:]))
        
    def feed(self,X) : ### feed function is used to transforms data into single dimension
        outputs = []
        for x in X :
            output = x.ravel() ### ravel is used to convert data into single dimensoin or flattens data
            outputs.append(np.expand_dims(output,axis=0))
        return np.array(outputs)
    
    def feed_back(self, Z , output_error, learning_rate=[1e-03],decay=1e-03,opt='sgd') :
        outs = []
        for z,out_err in zip(Z,output_error) :
            outs.append(out_err.reshape(self.input_shape[1:]))
        return np.array(outs)
    
    def predict(self,Y) :
        return Y.ravel()
    
    def Summary(self) :
        l = len(self.__Name__)
        print(f'{self.__Name__}',' '*(20-l),self.input_shape,' '*(20-len(str(self.input_shape))),self.output_shape)

In [15]:
class Dropout :
    def __init__(self,input_shape=None,drop=.5,__Name__ = 'Dropout',A_F=None) :
        self.__Name__ = __Name__
        self.__type__ = 'drop'
        self.N_F = drop
        self.input_shape = input_shape
        self.output_shape = self.input_shape
        self.weights = np.random.binomial(1, drop, self.input_shape[1:]) / drop
        self.A_F = A_F
        self.Batch_W = []
        
    def feed(self,X) :
        outputs = []
        for x in X :
            outputs.append(x*self.weights.T)
        return np.array(outputs)
    
    def feed_back(self, Z , output_error, learning_rate=[1e-03],decay=1e-03,opt='sgd') :
        return output_error * Z / self.N_F
    
    def predict(self,Y) :
        return Y*self.weights
    
    def Summary(self) :
        l = len(self.__Name__)
        print(f'{self.__Name__}',' '*(20-l),self.input_shape,' '*(20-len(str(self.input_shape))),self.output_shape)

In [16]:
class Dense :
    
    def __init__ (self,input_shape,N_F,A_F=None,wt=None,bias=None,output_shape=None,__Name__='Dense') :
        
        """
            Wt and Bias range [-2.4/No of nodes , 2.4/No of nodes] Proposed by range
        """
        self.__Name__ = __Name__
        self.__type__ = 'dense'
        self.input_shape = input_shape
        self.N_F = N_F
        self.A_F = A_F
        
        if output_shape is None :
            self.output_shape = (None,N_F)
        else :
            self.output_shape = (None,) + output_shape
        if wt is None :
            #self.weights = np.random.uniform(-2.4/self.N_F,2.4/self.N_F,(self.input_shape[1], self.output_shape[1]))* np.sqrt(2/self.N_F) 
            self.weights = 0.1*np.random.randn(self.input_shape[1] , self.output_shape[1]) / np.sqrt(self.input_shape[1] + self.output_shape[1])
        else :
            self.weights = wt
        if bias is None :
            #self.bias = np.random.uniform(-2.4/ self.N_F,2.4/ self.N_F,(1, self.output_shape[1]))
            self.bias = np.random.randn(1,self.output_shape[1]) / (self.input_shape[1] + self.output_shape[1])
        else :
            self.bias = bias
        
    def feed(self,X) :
        outputs = []
        for x in X :
            output = np.matmul(x,self.weights) + self.bias
            outputs.append(output)
        return np.array(outputs)

    def feed_back(self, Z , output_error, learning_rate=[1e-03],decay=1e-08,opt='sgd',b1=.9,b2=.999,mom=.02,n_iter=2):
        outs = []
        
        self.Batch_W = []
        self.Batch_B = []
        weights , bias = self.weights , self.bias
        
        lr = learning_rate[np.random.randint(len(learning_rate))]
        
        for z,out_err in zip(Z,output_error) :
            
            self.mdw = np.zeros((self.input_shape[1], self.output_shape[1])) 
            self.mdb = np.zeros((1,self.output_shape[1]))

            self.vdw = np.zeros((self.input_shape[1], self.output_shape[1])) 
            self.vdb = np.zeros((1,self.output_shape[1]))
            
            b1 = .9
            
            outs.append(np.dot(out_err,weights.T))
            
            weights_error = np.dot(z.T, out_err)
            
            if opt.lower() == 'gd' :
                self.weights = self.weights - lr * weights_error
                self.bias = self.bias - lr * out_err
                
            elif opt.lower() == 'sgd' :
                self.weights -= lr * weights_error - (1-b1) * weights_error * weights_error
                self.bias -= lr * out_err - (1-b1) * out_err * out_err
                
            elif opt.lower() == 'rmsprop' :
                self.vdw , self.vdb = RMSProp(weights_error,out_err,self.vdw,self.vdb)
                self.weights -= (lr/(np.sqrt(self.vdw + decay))) * weights_error
                self.bias -= (lr/(np.sqrt(self.vdb + decay))) * out_err
            
            elif opt.lower() == 'adam' :
                
                for t in range(n_iter) :
                    self.mdw,self.mdb,self.vdw,self.vdb,mdw_corr,mdb_corr,vdw_corr,vdb_corr = AdamOptimizer(weights_error,
                                                                                                            out_err,
                                                                                                            self.mdw,self.mdb,
                                                                                                            self.vdw,self.vdb,
                                                                                                            t,b1=b1)

                    self.weights = self.weights - (lr/(np.sqrt(vdw_corr+decay))) * mdw_corr
                    self.bias = self.bias - (lr/(np.sqrt(vdb_corr+decay))) * mdb_corr
                    
                    if n_iter > 1 :
                        
                        b1 *= (1-decay)

                        out = np.matmul(z,weights) + bias

                        weights_error = np.dot(z.T, out_err)
                        
        return np.array(outs)
    
    def predict(self,Y) :
        return np.matmul(Y,self.weights) + self.bias
    
    def Summary(self) :
        l = len(self.__Name__)
        print(f'{self.__Name__}',' '*(20-l),self.input_shape,' '*(20-len(str(self.input_shape))),self.output_shape)

In [17]:
def RMSProp(dw,db,vdw,vdb,b1=.9) :
    
    vdw = b1 * vdw + (1-b1) * dw ** 2
    vdb = b1 * vdb + (1-b1) * db ** 2
    
    return (vdw,vdb)

def AdamOptimizer(dw,db,mdw,mdb,vdw,vdb,n_iter,b1=.9,b2=.999) :
    
    mdw = b1 * mdw + (1-b1) * dw
    mdb = b1 * mdb + (1-b1) * db
    
    vdw_ = b2 * vdw + (1-b2) * dw ** 2
    vdb_ = b2 * vdb + (1-b2) * db ** 2
    
    vdw = np.maximum(vdw_,vdw)
    vdb = np.maximum(vdb_,vdb)
    
    mdw_corr = mdw / (1-np.power(b1,n_iter+1))
    mdb_corr = mdb / (1-np.power(b1,n_iter+1))
    
    vdw_corr = vdw / (1-np.power(b2,n_iter+1))
    vdb_corr = vdb / (1-np.power(b2,n_iter+1))
    
    return (mdw,mdb,vdw,vdb,mdw_corr,mdb_corr,vdw_corr,vdb_corr)

In [18]:
def PlotAcc(model) : ### Accuracy Plot
    plt.plot(range(model.ep),model.acc,c='b',label='acc')
    plt.plot(range(model.ep),model.val_acc,c='r',label='val_acc')
    plt.legend()
    plt.show()

def PlotError(model) : ### Error Plot
    plt.plot(range(model.ep),model.error,c='b',label='loss')
    plt.plot(range(model.ep),model.val_error,c='r',label='val_loss')
    plt.legend()
    plt.show()

In [19]:
class Sequential :
    
    """
        Sequential is a class which is used to stack layers of model and to fit , predict , predicting classes of our given i/p
    """
    
    def __init__ (self) :
        self.Layers = []
        self.input_shape = None
        self.Activations = []
        self.acc = []
        self.val_acc = []
        self.error = []
        self.val_error = []
        self.id = 1
        self.ep = 1
        
    def add(self,Layer) :
        boo = False
        for layer in self.Layers :
            if Layer.__type__ == layer.__type__ :
                boo = True
                if '_' not in Layer.__Name__ :
                    Layer.__Name__ += '_'+str(self.id)
                name,k = Layer.__Name__.split('_')
                Layer.__Name__ = name+'_'+str(int(k)+1)
        if not boo :
            if '_' not in Layer.__Name__ :
                Layer.__Name__ += '_'+str(self.id)
        
        self.Layers.append(Layer)
        if Layer.__type__ != 'activation' :
            if self.input_shape is None :
                self.input_shape = Layer.input_shape
            self.output_shape = Layer.output_shape
        if Layer.A_F is not None :
            if Layer.A_F.lower() == 'softmax' :
                self.Activations.append(Softmax())
            elif Layer.A_F.lower() == 'sigmoid' :
                self.Activations.append(Sigmoid())
            elif Layer.A_F.lower() == 'tanh' :
                self.Activations.append(TanH())
            else :
                self.Activations.append(ReLU())
        else :
            self.Activations.append(None)
    
    def ShuffleData(self,X,Y) :
        order = np.random.randint(0,len(Y),(len(Y)))
        for i in range(len(Y)-1) :
            X[order[i]] , X[order[i+1]] = X[order[i+1]] , X[order[i]]
            Y[order[i]] , Y[order[i+1]] = Y[order[i+1]] , Y[order[i]]
        return (X , Y)
    
    def SplitData(self,X,Y,split) :
        input_data , output_data , val_input_data , val_output_data = None , None , None , None
        N = int(len(Y) * (1-split))
        while True :
            X , Y = self.ShuffleData(X,Y)
            if set(Y[:N]) != set(Y[N:]) :
                X ,Y = self.ShuffleData(X,Y)
            else :
                input_data , output_data = X[:N] , Y[:N]
                val_input_data , val_output_data = X[N:] , Y[N:]
                break
        return (input_data , output_data , val_input_data , val_output_data)
        
    def compile(self,optimizer='sgd',loss='cross_entropy',metrics=['acc']) :
        self.optimizer = optimizer
        self.loss = loss
        self.metrics = metrics
        
    def one_hot_encode(self,l) :
        Labels = np.zeros(self.Layers[-1].output_shape[1:])
        Labels[l] = 1
#         for i,label in enumerate(labels) :
#             Labels[i][label] = 1

        return Labels
    
    def fit(self,train_data,valid_data=None,validation_split=.1,epochs=10,lr=[1e-02],decay=1e-03,batch_size=8,model_updation_epoch=2,PlotView=10,Labels=None) :
        self.epochs = epochs
        self.batch_size = batch_size
        input_data , output_data = None , None
        val_input_data , val_target_data = None , None
        if train_data is None :
            raise ValueError('Training Data Required')
        else :
            input_data = train_data[0]
            output_data = train_data[1]
        N = len(input_data)
        
        if valid_data is None :
            input_data , output_data , val_input_data , val_output_data = self.SplitData(input_data , output_data , validation_split)
        else :
            val_input_data , val_output_data = valid_data[0] , valid_data[1]
        
        if Labels is not None :
            self.Labels = Labels
        
        print('\nModel Fitting\n')
        
        N = len(output_data)
        
        for ep in range(epochs) :
#             self.Layers[0].plotImg(self.Layers[0].feed(np.expand_dims(input_data[0],axis=0)))
            model.ep = ep + 1
            start_ep = time.time()
            error = 0
            acc = 0
            print(f'\nepoch : {ep+1}/{epochs}')
            
            for b,batch in enumerate(tnrange(0,N-batch_size+1,batch_size,desc='batch')) :
                loss = None
                X , Y = input_data[batch:batch+batch_size] , output_data[batch : batch+batch_size]
                
                """
                    Feed Forward
                """
                
                outputs = X
                
                outs = {i.__Name__ : {'lay':None,'act':None} for i in self.Layers}
                ins = {i.__Name__ : {'lay':None,'act':None} for i in self.Layers}
                
                for lay , act in zip( self.Layers , self.Activations ) :
                    ins[lay.__Name__]['lay'] = outputs
                    outputs = lay.feed(outputs)
                    outs[lay.__Name__]['lay'] = outputs
                    if act is not None :
                        ins[lay.__Name__]['act'] = outputs
                        outputs = act.feed(outputs)
                        outs[lay.__Name__]['act'] = outputs
                    else :
                        ins[lay.__Name__]['act'] = outputs
                        outs[lay.__Name__]['act'] = outputs
                        
                        
                """
                    Loss
                """
                
                out_errs = []
                losses = []
                if self.loss == 'cross_entropy' :
                    for i,j,k in zip(outs[self.Layers[-1].__Name__]['act'] , Y , outs[self.Layers[-1].__Name__]['lay']) :
                        loss = self.crossentropy(i,j)#,k)
                        losses.append(loss)
                        print('\rLoss : %f \t' % (loss) , end="")
                        
                        if math.isnan(loss) :
                            return None
                        
                        #grad_activation = self.Activations[-1].grad_feed(k)
                        out_err = self.grad_crossentropy(k,j)#*grad_activation
                        
                        out_errs.append(np.expand_dims(out_err,axis=0))
                        
                out_errs = np.array(out_errs)
                
                print('\rLoss : %f' % (np.mean(losses)) , end="")
                
                """
                    Backward Feed
                """
                
                for i in range(1,len(self.Layers)-1) :
                    nam = self.Layers[-i].__Name__
                    na = self.Activations[-i]
                    abcd = time.time()
                    if na is not None and na.__Name__ != 'Softmax' :
                        out_errs = self.Activations[-i].feed_back(ins[nam]['act'],out_errs,lr)
                    out_errs = self.Layers[-i].feed_back(ins[nam]['lay'],out_errs,lr,decay,opt=self.optimizer)

            error = loss
    
            accuracy = sum([y == np.argmax(model.predict(x)[0]) for x, y in zip(input_data, output_data)]) / N
            
            ac , loss = [] , []
            
            for x, y in zip(val_input_data, val_output_data) :
                out,lay_out = model.predict(x)
                if y == np.argmax(out) :
                    ac.append(1)
                else :
                    ac.append(0)
                
            ac = np.mean(ac)
            
            print('\racc=%f , val_acc=%f' % (accuracy ,ac))
        
            if ep == 0 :
                callback(model,fileName)
            else :
                if max(model.acc) < accuracy :
                    callback(model,fileName)
                    
            self.acc.append(accuracy)
            self.val_acc.append(ac)
            
            self.error.append(error)
            self.val_error.append(loss)
            
            if not (ep+1)%PlotView :
                PlotAcc(self)
#                 PlotError(self)
            
        return self
    
    def mse(self,y_true, y_pred):
        return np.mean(np.power(y_true - y_pred, 2))
    
    def mse_prime(self,y_true, y_pred):
        return 2 * (y_pred - y_true) / y_pred.size
    
    def transfer_derivative(self,output):
        return output * (1.0 - output)
    
    def binary_crossentropy(self,pred,Truth) :
        GroundTruth = np.zeros(self.No_of_outs)
        GroundTruth[Truth] = 1
        return -np.mean(GroundTruth*np.log(pred)+(1-GroundTruth)*np.log(1-pred))
    
    def binary_grad_crossentropy(self,pred,Truth) :
        GroundTruth = np.zeros(self.No_of_outs)
        GroundTruth[Truth] = 1
        return -((GroundTruth/pred)-((1-GroundTruth)/(1-pred)))
    
    def cat_crossentropy(self,pred,Truth,out) :
        Truth = self.one_hot_encode(Truth)
        return np.dot(Truth.ravel()/(pred.ravel()+1e-8) , self.Activations[-1].grad_feed(out).ravel())

    def grad_cat_crossentropy(self,pred,Truth) :
        Truth = self.one_hot_encode(Truth)
        a = Truth/pred
        b = self.Activations[-1].grad_feed(self.Layers[-1].output)[0]
        return np.dot((Truth/pred),self.Activations[-1].grad_feed(self.Layers[-1].output)[0])
    
    def crossentropy(self,logits,reference_answers) :
        return - logits[0][reference_answers] + np.log(np.sum(np.exp(logits),axis=-1))
    
    def grad_crossentropy(self,logits,reference_answers):
        logits = np.ravel(logits)
        ones_for_answers = np.zeros_like(logits)
        ones_for_answers[reference_answers] = 1
        e_x = np.exp(logits-np.max(logits))
        softmax = e_x / e_x.sum(axis=-1,keepdims=True)
        return (- ones_for_answers + softmax) / logits.shape[0]
    
    def showImg(self,X) :
        plt.imshow(X)
        plt.show()
    
    def predict(self,X):
        outputs = []
        lay_out = None
        if X.shape == model.input_shape[1:] :
            output = X
            for layer , activation in zip(self.Layers,self.Activations) :
                output = layer.predict(output)
                lay_out = output
                if activation is not None :
                    output = activation.predict(output)
            outputs.append(output)
        else :
            for output in X :
                for layer , activation in zip(self.Layers,self.Activations) :
                    output = layer.predict(output)
                    if activation is not None :
                        output = activation.feed(output)
                outputs.append(output)
        return np.array(outputs) , lay_out
    
    def pred_class(self,X) :
        classes = []
        if X.shape == model.input_shape :
            output = self.predict(X)
            return np.argmax(output)
        else :
            for output in X :
                output = self.predict(output)
                classes.append(np.argmax(output))
            return np.array(classes)
    
    def Summary(self) :
        print('='*60)
        print('Model Summary')
        print('_'*60)
        print('Layers',' '*(20-len('Layers')),'Input Shape',' '*(20-len('Input Shape')),'Output Shape',' '*(20-len('Output Shape')))
        print('='*60)
        for Layer in self.Layers :
            if Layer.__type__ != 'activation' :
                Layer.Summary()
                print('_'*60)
        print('='*60)

In [20]:
print(f"""X_train shape : {X_train.shape}, Y_train shape : {Y_train.shape}
X_test shape : {X_test.shape}, Y_test shape : {Y_test.shape}""")

X_train shape : (2762, 32, 32, 3), Y_train shape : (2762,)
X_test shape : (1185, 32, 32, 3), Y_test shape : (1185,)


In [21]:
### Model Initiating

model = Sequential()

model.add(Conv2D(16,3,input_shape=X_train[0].shape,A_F='ReLU',STRIDES=2,pad=1))
# model.add(Conv2D(32,3,input_shape=model.output_shape,A_F='ReLU'))
model.add(MaxPool2D(3,input_shape=model.output_shape))

# # model.add(Conv2D(64,3,input_shape=model.output_shape,A_F='ReLU'))
# model.add(MaxPool2D(3,input_shape=model.output_shape,pad=1))

model.add(Flatten(input_shape=model.output_shape))

# model.add(Flatten(input_shape=X_train[0].shape))

# model.add(Dense(input_shape=model.output_shape,N_F=100,A_F='ReLU'))

# model.add(Dropout(input_shape=model.output_shape))

model.add(Dense(input_shape=model.output_shape,N_F=len(set(Y_train)),A_F='Softmax'))

model.compile(optimizer='adam',loss='cross_entropy',metrics=['acc'])

model.Summary()

Model Summary
____________________________________________________________
Layers                Input Shape           Output Shape         
Conv2D_1              (None, 32, 32, 3)     (None, 16, 16, 16)
____________________________________________________________
MaxPool2D_1           (None, 16, 16, 16)    (None, 7, 7, 16)
____________________________________________________________
Flatten_1             (None, 7, 7, 16)      (None, 784)
____________________________________________________________
Dense_1               (None, 784)           (None, 6)
____________________________________________________________


In [None]:
model = model.fit(train_data=(X_train,Y_train),
                  epochs=50,
                  valid_data=(X_test,Y_test),
                  lr=[1e-05],
                  decay=1e-08,
                  batch_size=1,
                  Labels=Labels)


Model Fitting


epoch : 1/50


HBox(children=(FloatProgress(value=0.0, description='batch', max=2762.0, style=ProgressStyle(description_width…

Loss : 2.015418 	

In [None]:
pred = model.predict(X_train[0])
print(pred)
print(np.argmax(pred))
plt.imshow(X_train[0])
plt.show()