In [1]:
import numpy as np
import matplotlib.pyplot as plt

#Evaluation index
from sklearn.metrics import accuracy_score


In [2]:
from keras.datasets import mnist

# Load the MNIST dataset
(X, y), (X_test, y_test) = mnist.load_data()


In [3]:
print(X.shape)
print(X_test.shape)
print(X[0].dtype)

(60000, 28, 28)
(10000, 28, 28)
uint8


In [4]:
#Type conversion and Normalization
X=X.astype(float)
X_test = X_test.astype(float)
X/=255
X_test/=255
print(X.max())
print(X.min())

1.0
0.0


In [5]:
#one hot Encoding
from sklearn.preprocessing import OneHotEncoder
enc = OneHotEncoder(handle_unknown='ignore', sparse=False)
y_one_hot = enc.fit_transform(y[:, np.newaxis])
y_test_one_hot = enc.transform(y_test[:, np.newaxis])
print(y.shape) # (60000,)
print(y_one_hot.shape) # (60000, 10)
print(y_one_hot.dtype) # float64

(60000,)
(60000, 10)
float64




In [6]:
#Tran Test Split
from sklearn.model_selection import train_test_split
X_train, X_valid, y_train, y_valid = train_test_split(X,y_one_hot, test_size =0.2)
print(X_train.shape)
print(X_valid.shape)
print(y_train.shape)
print(y_valid.shape)

(48000, 28, 28)
(12000, 28, 28)
(48000, 10)
(12000, 10)


In [7]:
#Prepraring NN Class

In [8]:

class FC():
    def __init__ (self, n_nodes1, n_nodes2, initializer, optimizer, activation):
        self.n_nodes1 = n_nodes1
        self.n_nodes2 = n_nodes2
        self.initializer = initializer
        self.optimizer = optimizer
        self.activation = activation

        #Initialize Function
        self.W = self.initializer.W(self.n_nodes1, self.n_nodes2)
        self.B = self.initializer.B(self.n_nodes2)
    def forward(self, X):
        self.X = X
        X_reshaped = X.reshape(X.shape[0], -1)
        self.A = np.dot(X_reshaped,self.W) + self.B
        return self.activation.forward(self.A)
    def backward(self, dZ):
        dA = self.activation.backward(dZ)
        self.dB = np.mean(dA, axis=0)
        self.dW = np.dot(self.X.T, dA)/len(self.X)
        dZ= np.dot(dA, self.W.T)
        #Update
        self = self.optimizer.update(self)
        return dZ

In [9]:
class SimpleInitializerConv2d:
    def __init__(self, sigma =0.01):
        self.sigma = sigma
    def W (self, F,C, FH, FW):
        return self.sigma * np.random.randn(F,C,FH,FW)
    def B(self, F):
        return np.zeros(F)

In [10]:
class SimpleInitializer:
    def __init__(self, sigma):
        self.sigma = sigma
    def W(self, n_nodes1, n_nodes2):
        return self.sigma * np.random.randn(n_nodes1, n_nodes2)
    def B(self, n_nodes2):
        return np.zeros(n_nodes2)

In [11]:
class HeInitialzier():
    def __init__(self):
        pass
    def W(self, n_nodes1, n_nodes2):
        return np.random.randn(n_nodes1, n_nodes2)*np.sqrt(2/n_nodes1)
    def B(self,n_nodes2):
        return np.zeros(n_nodes2)
    

In [12]:
#Optimization

In [13]:
class SGD:
    def __init__(self, lr=0.01):
        self.lr =lr
    def update(self, layer):
        layer.W -= self.lr*layer.dW
        layer.B -= self.lr*layer.dB
        return layer

In [14]:
class AdaGrad:
    def __init__(self, lr):
        self.lr = lr
        self.hW = 0
        self.hB = 0
    def update(self,layer):
        self.hW += layer.dW*layer.dW
        self.hB = layer.dB*layer.dB

        layer.W -= self.lr*layer.dW/(np.sqrt(self.hW)+1e-7)
        layer.B -= self.lr*layer.dB/(np.sqrt(self.hB)+1e-7)
        return layer

In [15]:
class ReLU:
    def __init__(self, A=None):
        self.A = A

    def forward(self, Z):
        self.A = np.maximum(0, Z)
        return self.A

    def backward(self, dA):
        dZ = np.where(self.A > 0, dA, 0)
        return dZ


In [16]:
#class ReLU():
   # def __init__(self,A):
       # pass
    #def forward(self, A):
        #self.A = A
        #return np.maximum(self.A,0)
   # def backward(self,dZ):
        #return np.where(self.A>0, dZ,0)

In [17]:
class Softmax():
    def __init__(self):
        pass
    def forward (self, A):
        return np.exp(A-np.max(A))/np.sum(np.exp(A-np.max(A)), axis-1, keepdims=True)
    def backward(self,dZ):
        return dZ
    

In [18]:
class GetMiniBatch:
    def __init__(self, X,y, batch_size =20, seed=None):
        self.batch_size = batch_size
        np.random.seed(seed)
        shuffle_index = np.random.permutation(np.arange(X.shape[0]))
        self._X = X[shuffle_index]
        self._y = y[shuffle_index]
        self._stop = np.ceil(X.shape[0]/self.batch_size).astype(np.int64)
    def __len__(self):
        return self._stop
    def __getitem__(self,item):
        p0 = item*self.batch_size
        p1 = item*self.batch_size + self.batch_size
        return self._X[p0:p1], self._y[p0:p1]
    def __iter__(self):
        self._counter = 0
        return self
    def __next__(self):
        if self._counter >= self._stop:
            raise StopIteration()
        p0 = self._counter*self.batch_size
        p1 = self._counter*self.batch_size + self.batch_size
        self._counter += 1
        return self._X[p0:p1], self._y[p0:p1]

In [19]:
class SimpleConv2d():
    def __init__(self, F,C,FH,FW,P,S, initializer=None, optimizer=None, activation=None):
        self.P = P
        self.S =S
        self.initializer = initializer
        self.optimizer = optimizer
        self.activation = activation

        #Initialize
        self.W = self.initializer.W(F,C,FH,FW)
        self.B = self.initializer.B(F)
    def output_shape2d(self, H,W,PH,PW,FH,FW,SH,SW):
        OH = (H + 2*PH -FH)/SH +1
        OW =(W +2*PW -FW)/SW +1
        return int(OH), int(OW)
    def forward(self,X):
        self. X = X
        N,C,H,W = self.X.shape
        F,C,FH,FW = self.W.shape
        OH,OW = self.output_shape2d(H,W,self.P, self.P, FH, FW, self.S, self.S)
        self.params = N,C,H,W,F,FH,FW,OH,OW
        A = np.zeros([N,F,OH,OW])
        self.X_pad = np.pad(self.X,((0,0), (0,0), (self.P, self.P),(self.P,self.P)))

        for n in range(N):
            for ch in range(F):
                for row in range(0,H,self.S):
                    for col in range(0,W,self.S):
                        A[n,ch,row,col] = \
                        np.sum(self.X_pad[n,:, row:row+FH, col:col+FW]
                               *self.W[ch,:,:,:]) \
                        +self.B[ch]
            return self.activation.forward(A)
    def backward(self, dZ):
        dA =self.activation.backward(dZ)
        N,C,H,F,FH,FW,OH,OW = self. params
        dZ = np.zeros(self.X_pad.shape)
        self.dW = np.zeros(self.W.shape)
        self.dB = np.zeros(self.B.shape)


        for n in range(N):
            for ch in range(F):
                for row in range(0,H, self.S):
                    for col in range (0,W,self.S):
                        dZ [n,:, row:row+FH,col:col+FW] += dA[n,ch,row,col]*self.W[ch,:,:,:]
        d1_rows = range(self.P), range(H+self.P, H+2*self.P,1)
        d1_cols = range(self.P), range(W+self.P, W+2*self.P,1)

        dZ = np.delete(dZ, d1_rows, axis=2)
        dZ = np.delete(dZ, d1_cols, axis=3)

        for n in range(N):
            for ch in range(F):
                for row in range(OH):
                    for col in range (OW):
                        self.dW[ch,:,:,:] += dA[n,ch,row,col]*self.X_pad[n,:,row:row+FH, col:col+FW]
            #output channels
        for ch in range(F):
            self.dB[ch] = np.sum(dA[:,ch,:,:])

            self = self.optimizer.update(self)
            return dZ
                
            

In [20]:
def output_shape2d(IH=5, IW=5, PH=0, PW=0, FH=3, FW=3, SH=1, SW=1):
    OH  = (IH +2*PH -FH)/SH +1
    OW= (IW+2*PW -FW)/SW +1
    return int(OH), int(OW)
    


In [21]:
print(output_shape2d(IH=6, IW=6, PH=0, PW=0, FH=3, FW=3, SH=1, SW=1))

(4, 4)


In [22]:
N,C,H,W = (5,1,28,28)
F,C,FH,FW = (4,1,3,3)

S=1
P =1

OH, OW = output_shape2d(H,W,P,P,FH,FW,S,S)
A = np.zeros([N,F,OH,OW])

X_sample = X[0:N].reshape(N,C,H,W)
X_pad = np.pad(X_sample,((0,0), (0,0), (P,P),(P,P)))
w = np.ones([F,C,FH,FW])
B = np.ones(F)

for n in range(N):
    for ch in range(F):
        for row in range(0, H,S):
            for col in range(0,W,S):
                A[n,ch,row,col] = \
                np.sum(X_pad[n,:,row:row+FH, col:col+FW]*w[ch,:,:,:]) +B[ch]
print('A.shape:', A.shape)


A.shape: (5, 4, 28, 28)


In [23]:
dA = np.ones(A.shape)
dZ = np.zeros(X_pad.shape)
dW = np.zeros(w.shape)
dB = np.zeros(B.shape)

for n in range(N):
    for ch in range(F):
        for row in range(0,H,S):
            for col in range (0,W,S):
                dZ[n,:, row:row+FH,col:col+FW] += dA[n,ch,row,col]*w[ch,:,:,:]
d1_rows = range(P), range(H+P, H+2*P,1)
d1_cols = range(P), range(W+P, W+2*P,1)

dZ = np.delete(dZ, d1_rows, axis=2)
dZ = np.delete(dZ, d1_cols, axis=3)
for n in range(N):
            for ch in range(F):
                for row in range(OH):
                    for col in range (OW):
                        dW[ch,:,:,:] += dA[n,ch,row,col]*X_pad[n,:,row:row+FH, col:col+FW]
            #output channels
for ch in range(F):
    dB[ch] = np.sum(dA[:,ch,:,:])
print('dZ.shape:', dZ.shape)
print('dW.shape:', dW.shape)
print('dB.shape:', dB.shape)




dZ.shape: (5, 1, 28, 28)
dW.shape: (4, 1, 3, 3)
dB.shape: (4,)


In [24]:
import numpy as np

class MaxPool2D:
    def __init__(self, P=2):
        self.P = P
        self.PA = None
        self.Pindex = None

    def forward(self, A):
        N, C, H, W = A.shape
        PS = self.P
        PH = H // PS
        PW = W // PS
        
        self.PA = np.zeros((N, C, PH, PW))
        self.Pindex = np.zeros((N, C, PH, PW), dtype=int)
        
        for n in range(N):
            for ch in range(C):
                for row in range(PH):
                    for col in range(PW):
                        # Calculate the indices for pooling window
                        start_row, start_col = row*PS, col*PS
                        end_row, end_col = start_row+PS, start_col+PS
                        
                        # Ensure indices are within bounds of the input array
                        end_row = min(end_row, H)
                        end_col = min(end_col, W)
                        
                        # Perform pooling only if the window is within bounds
                        if start_row < H and start_col < W:
                            window = A[n, ch, start_row:end_row, start_col:end_col]
                            
                            # Calculate maximum value within the pooling window
                            self.PA[n, ch, row, col] = np.max(window)
                            # Record the index of the maximum value
                            self.Pindex[n, ch, row, col] = np.argmax(window)

        return self.PA
    def backward(self, dA):
        N, C, PH, PW = dA.shape
        PS = self.P
        dA_prev = np.zeros_like(self.PA)
    
        for n in range(N):
            for ch in range(C):
                for row in range(PH):
                    for col in range(PW):
                        index = self.Pindex[n, ch, row, col]
                        # Calculate the row and column indices in dA_prev
                        dA_row = row*PS + index // PS
                        dA_col = col*PS + index % PS
                        # Check if the calculated indices are within the bounds of dA_prev
                        if dA_row < dA_prev.shape[2] and dA_col < dA_prev.shape[3]:
                            dA_prev[n, ch, dA_row, dA_col] += dA[n, ch, row, col]
        
        return dA_prev



# Example usage
X = np.random.randint(0,9,36).reshape(1,1,6,6)
print(X)
Pooling = MaxPool2D(P=2)
A = Pooling.forward(X)
print(A.shape)
print(A)


[[[[4 6 5 5 0 6]
   [6 7 5 5 0 2]
   [1 4 1 8 8 7]
   [2 3 1 0 3 2]
   [2 8 3 4 3 7]
   [2 8 2 2 1 1]]]]
(1, 1, 3, 3)
[[[[7. 5. 6.]
   [4. 8. 8.]
   [8. 4. 7.]]]]


In [25]:
Pooling.Pindex

array([[[[3, 0, 1],
         [1, 1, 0],
         [1, 1, 1]]]])

In [26]:
dA = np.random.randint(0,9,9).reshape(A.shape)
print(dA)

[[[[3 0 4]
   [6 6 3]
   [5 8 4]]]]


In [27]:
dZ = Pooling.backward(dA)
print(dZ)

[[[[0. 0. 0.]
   [0. 3. 0.]
   [0. 6. 0.]]]]


In [28]:
class Flatten:
    def __init__(self,):
        pass
    def forward(self, X):
        self.shape = X.shape
        return X.reshape(len(X), -1)
    def backward(self, X):
        return X.reshape(self.shape)

In [29]:
TEST = np.zeros([20,2,5,5])
flt = Flatten()
flat_forward = flt.forward(TEST)
print('Foward_shape:', flat_forward.shape)
print('Backward_shape:', flt.backward(flat_forward).shape)
                           

Foward_shape: (20, 50)
Backward_shape: (20, 2, 5, 5)


In [30]:
#PROBLEM 7 TRAINING AND ESTIMATION

In [31]:
class Scratch2dCNNClassifier():
    def __init__(self, NN, CNN, n_epochs=5, n_batch=1, verbose = False):
        self.n_epochs = n_epochs
        self.n_batch = n_batch
        self.verbose = verbose
        self.log_loss = np.zeros(self.n_epochs)
        self.log_acc = np.zeros(self.n_epochs)
        self.NN=NN
        self.CNN = CNN
    def loss_function(self, y,yt):
        delta = 1e-7
        return -np,mean(yt*np.log(y+delta))
    def accuracy(self, Z,Y):
        return accuracy_score(Y, Z)
    def fit(self, X,y, X_val=False, y_val=False):
        for epoch in range(self.n_epochs):
            get_mini_batch = GetMiniBatch(X,y, batch_size = self.n_batch)

            self.loss =0
            for mini_X_train, mini_y_train in get_mini_batch:
                forward_data = mini_X_train[:,np.newaxis,:,:]
                #conv
                for layer in range(len(self.CNN)):
                    forward_data = self.CNN[layer].forward(forward_data)
                #flatten
                flt = Flatten()
                forward_data = flt.forward(forward_data)
                #NN
                for layer in range(len(self.NN)):
                    forward_data = self.NN[layer].forward(forward_data)
                Z=forward_data
                #Backward propagation
                backward_data = (Z - mini_y_train)/self.n_batch
                for layer in range(len(self.NN)-1,-1,-1):
                    backward_data = self.NN[layer].backward(backward_data)
                backward_data = flt.backward(backward_data)
                for layer in range(len(self.CNN),-1,-1,-1):
                    backward_data = self.CNN[layer].backward(backward_data)
                #loss function
                self.loss += self.loss_fuction(Z,mini_y_train)
            self.log_loss[epoch] = self.loss/len(get_mini_batch)
            self.log_acc[epoch] = self.accuracy(self.predict(X), np.argmax(y,axis=1))
    def predict(self, X):
        pred_data = X[:,np.newaxis, :,:]
        #Conv
        for layer in range(len(self.CNN)):
            pred_data = self.CNN[layer].forward(pred_data)
        flt = Flatten()
        pred_data = flt.forward(pred_data)
        #NN
        for layer in range(len(self.NN)):
            pred_data = self.NN[layer].forward(pred_data)
        return np.argmax(pred_data, axis=1)
        
        
                
        

In [32]:
#All bonding layers
NN = {0:FC(784, 400, HeInitialzier(), AdaGrad(0.01), ReLU()),
      1:FC(400, 200, HeInitialzier(), AdaGrad(0.01), ReLU()),
      2:FC(200,10, SimpleInitializer(0.01), AdaGrad(0.01), Softmax()),
    
}

In [33]:
CNN = {0:SimpleConv2d(F=10, C=1, FH=3, FW=3, P=1, S=1,
                      initializer = SimpleInitializerConv2d(),
                      optimizer =SGD(),
                      activation =ReLU())
    
}

In [34]:
cnn1 = Scratch2dCNNClassifier(NN=NN, CNN=CNN, n_epochs =10, n_batch =200, verbose = False)
cnn1.fit(X_train[0:1000], y_train[0:1000])

ValueError: shapes (200,7840) and (784,400) not aligned: 7840 (dim 1) != 784 (dim 0)

In [None]:
#Estimate
y_pred = cnn1.predict(X_valid[0:1000])