In [221]:
import numpy as np
import pickle
import os
import random
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
from copy import deepcopy
%matplotlib inline

<h2>Phase 1: Preprocess Data<h2/>

In [222]:
with open("Feature_extraction/train_set_features_4_fruits.pkl", "rb") as f:
    train_set_features2 = pickle.load(f)

with open("Feature_extraction/train_set_labels_4_fruits.pkl", "rb") as f:
    train_set_labels = pickle.load(f)

with open("Feature_extraction/test_set_features_4_fruits.pkl", "rb") as f:
    test_set_features2 = pickle.load(f)

with open("Feature_extraction/test_set_labels_4_fruits.pkl", "rb") as f:
    test_set_labels = pickle.load(f)

In [223]:
# reducing feature vector length 
features_STDs = np.std(a=train_set_features2, axis=0)
train_set_features = train_set_features2[:, features_STDs > 52.3]

# changing the range of data between 0 and 1
train_set_features = np.divide(train_set_features, train_set_features.max())


# reducing feature vector length 
features_STDs = np.std(a=test_set_features2, axis=0)
test_set_features = test_set_features2[:, features_STDs > 48]

# changing the range of data between 0 and 1
test_set_features = np.divide(test_set_features, test_set_features.max())

In [224]:
train_set = []
test_set = []

for i in range(len(train_set_features)):
    label = np.zeros((4, ), dtype='int32')
    label[int(train_set_labels[i])] = 1
    label = label.reshape(4,1)
    train_set.append((train_set_features[i].reshape(102,1), label))
    

for i in range(len(test_set_features)):
    label = np.zeros((4, ), dtype='int32')
    label[int(test_set_labels[i])] = 1
    label = label.reshape(4,1)
    test_set.append((test_set_features[i].reshape(102,1), label))
    
# shuffle
random.shuffle(train_set)
random.shuffle(test_set)

In [None]:
print(train_set_features.shape)
print(train_set_labels.shape)

In [None]:
train_set[0][0].shape

<h2>Phase 2: Forward Propagation<h2/> 

In [227]:
def sigmoid(x):
    return 1 / (1 + np.exp(-x))

In [228]:
def initialize_parameters(layer_dims): 
    
    parameters = {}
    
    for l in range(1, len(layer_dims)):
        parameters['W'+str(l)] = np.random.randn(layer_dims[l], layer_dims[l - 1]) 
        parameters['b'+str(l)] = np.zeros((layer_dims[l], 1))
          
    return parameters

In [229]:
def forward(x, parameters):
    middleputs = {}
    A = x
    
    for l in range(len(parameters) // 2):
        middleputs['Z'+str(l+1)] = np.matmul(parameters['W'+str(l+1)], A) + parameters['b'+str(l+1)]
        middleputs['A'+str(l+1)] = sigmoid(middleputs['Z'+str(l+1)])
        A = middleputs['A'+str(l+1)]
    
    return A, middleputs

In [230]:
def test(dataset, parameters):
    total = 0
    correct = 0
    
    for feature, label in dataset:
        output, _ = forward(feature, parameters)
        
        predicted = np.argmax(output, axis=0).item()
        target = np.argmax(label, axis=0).item()
        
        if predicted == target:
            correct += 1

        total += 1
        
    return {'accuracy':float('{:.2f}'.format(100 * (correct / total)))}

In [None]:
parameters = initialize_parameters([102, 150, 60, 4])
test(train_set[:200], parameters)

<h2>Phase 3: Non-Vectorized Backward Propagetion<h2/>

In [235]:
def cost_se(predicted, target):
    return np.sum(np.square(predicted - target))

In [236]:
def sigmoid_deriv(x):
    return sigmoid(x) * (1 - sigmoid(x))

<p style='font-size: 17px'><em>warning: before running the below cell, please run phase 7 code block<em/><p/>

In [237]:
def train(epochs, batch_size, lr, dataset, layer_dims, backward_func, lr_decay_weight=0):
    
    dataset = deepcopy(dataset)
    parameters = initialize_parameters(layer_dims)
    costs = []
    lr_scheduler = LR_Decay(lr, lr_decay_weight)
    
    for epoch in tqdm(range(epochs), leave=False, bar_format='{percentage:02.0f}%  {bar}  epoch {n_fmt}/{total_fmt}'):
        random.shuffle(dataset)
        cost = 0
        epoch_lr=lr_scheduler.step()
        
        batch_num = len(dataset) // batch_size
        if len(dataset) % batch_size != 0:
            batch_num += 1
            
        for i in range(batch_num):
            batch = dataset[i * batch_size:min((i * batch_size) + batch_size, len(dataset))]
            
            grads = {}
            for l in range(len(parameters) // 2):
                grads['dW'+str(l+1)] = np.zeros_like(parameters['W'+str(l+1)])
                grads['db'+str(l+1)] = np.zeros_like(parameters['b'+str(l+1)])
                
            
            for x, y in batch:
                output, middleputs = forward(x, parameters)
                s_cost = cost_se(output, y)
                s_grads = backward_func(x, y, parameters, middleputs)
                
                cost += s_cost
                
                for l in range(len(parameters) // 2):
                    grads['dW'+str(l+1)] = grads['dW'+str(l+1)] + s_grads['dW'+str(l+1)]
                    grads['db'+str(l+1)] = grads['db'+str(l+1)] + s_grads['db'+str(l+1)]
               
            
            for l in range(len(parameters) // 2):
                parameters['W'+str(l+1)] = parameters['W'+str(l+1)] - (epoch_lr * (grads['dW'+str(l+1)] / batch_size))
                parameters['b'+str(l+1)] = parameters['b'+str(l+1)] - (epoch_lr * (grads['db'+str(l+1)] / batch_size))
        
        costs.append(cost / len(dataset))
    
    
    return parameters, costs

In [238]:
def nonvectorized_backward(x, label, parameters, middleput):
    
    l = len(parameters) // 2
    grads = {}
    
    # last layer 
    b_shape = parameters['b'+str(l)].shape
    
    dA = np.zeros(b_shape)
    for i in range(dA.shape[0]):
        dA[i, 0] += 2 * (middleput['A'+str(l)][i, 0] - label[i, 0])
    
    W_shape= parameters['W'+str(l)].shape
    grads['dW'+str(l)] = np.zeros(W_shape)
    for i in range(W_shape[0]):
        for j in range(W_shape[1]):
            grads['dW'+str(l)][i, j] = middleput['A'+str(l-1)][j, 0] * sigmoid_deriv(middleput['Z'+str(l)][i, 0]) * dA[i, 0]

    grads['db'+str(l)] = np.zeros(b_shape)
    for i in range(b_shape[0]):
        grads['db'+str(l)][i, 0] = sigmoid_deriv(middleput['Z'+str(l)][i, 0]) * dA[i, 0]

    l -= 1
    
    # hidden layers
    while l > 1:
        dA_prev = dA
        b_shape = parameters['b'+str(l)].shape
    
        dA = np.zeros(b_shape)
        for i in range(dA.shape[0]):
            for j in range(dA_prev.shape[0]):
                dA[i, 0] += parameters['W'+str(l+1)][j, i] * sigmoid_deriv(middleput['Z'+str(l+1)][j, 0]) * dA_prev[j, 0]
        
        W_shape = parameters['W'+str(l)].shape
        grads['dW'+str(l)] = np.zeros(W_shape)
        for i in range(W_shape[0]):
            for j in range(W_shape[1]):
                grads['dW'+str(l)][i, j] = middleput['A'+str(l-1)][j, 0] * sigmoid_deriv(middleput['Z'+str(l)][i, 0]) * dA[i, 0]
        
        grads['db'+str(l)] = np.zeros(b_shape)
        for i in range(b_shape[0]):
            grads['db'+str(l)][i, 0] = sigmoid_deriv(middleput['Z'+str(l)][i, 0]) * dA[i, 0]
            
        l -= 1

    # first layer
    dA_prev = dA
    b_shape = parameters['b'+str(l)].shape

    dA = np.zeros(b_shape)
    for i in range(dA.shape[0]):
        for j in range(dA_prev.shape[0]):
            dA[i, 0] += parameters['W'+str(l+1)][j, i] * sigmoid_deriv(middleput['Z'+str(l+1)][j, 0]) * dA_prev[j, 0]

    W_shape = parameters['W'+str(l)].shape
    grads['dW'+str(l)] = np.zeros(W_shape)
    for i in range(W_shape[0]):
        for j in range(W_shape[1]):
            grads['dW'+str(l)][i, j] = x[j, 0] * sigmoid_deriv(middleput['Z'+str(l)][i, 0]) * dA[i, 0]

    grads['db'+str(l)] = np.zeros(b_shape)
    for i in range(b_shape[0]):
        grads['db'+str(l)][i, 0] = sigmoid_deriv(middleput['Z'+str(l)][i, 0]) * dA[i, 0]
        
    return grads

In [None]:
%%time

lr = 1
batch_size = 10
epochs = 10

parameters, costs = train(epochs, batch_size, lr, train_set[:200], layer_dims=[102, 150, 60, 4], backward_func=nonvectorized_backward)
result = test(train_set[:200], parameters)
print('train accuracy is {:.2f}%'.format(result['accuracy']))
plt.plot(costs)
plt.show()

<h2>Phase 4: Vectorized Backward Propagetion<h4/>

In [240]:
def vectorized_backward(x, label, parameters, middleput):
        
    l = len(parameters) // 2
    grads = {}

    # last layer
    dA = 2 * (middleput['A'+str(l)] - label)
    dZ = dA * sigmoid_deriv(middleput['Z'+str(l)]) 
    grads['dW'+str(l)] = np.matmul(dZ, middleput['A'+str(l-1)].T)
    grads['db'+str(l)] = dZ

    l -= 1

    # hidden layers
    while l > 1:
        dA = np.matmul(parameters['W'+str(l+1)].T, dZ)
        dZ = dA * sigmoid_deriv(middleput['Z'+str(l)]) 
        grads['dW'+str(l)] = np.matmul(dZ, middleput['A'+str(l-1)].T)
        grads['db'+str(l)] = dZ

        l -= 1


    # first layer
    dA = np.matmul(parameters['W'+str(l+1)].T, dZ)
    dZ = dA * sigmoid_deriv(middleput['Z'+str(l)]) 
    grads['dW'+str(l)] = np.matmul(dZ, x.T)
    grads['db'+str(l)] = dZ   

    return grads

In [None]:
%%time

total_train = 10
lr = 1
batch_size = 10
epochs = 20

total_costs = []
total_accuracy = 0

for i in tqdm(range(total_train), leave=False, bar_format='{n_fmt} / {total_fmt} {bar}'):
    parameters, costs = train(epochs, batch_size, lr, train_set, layer_dims=[102, 150, 60, 4], backward_func=vectorized_backward)
    total_costs.append(costs)
    result = test(train_set, parameters)
    total_accuracy += result['accuracy']
    
costs = (np.sum(np.array(total_costs), axis=0) / total_train).tolist()
total_accuracy /= total_train

print('train accuracy is {:.2f}%'.format(total_accuracy))
plt.plot(costs)
plt.show()

<h2>Phase5: Final Test<h2/>

In [242]:
def multi_train_test(total_train, epochs, batch_size, lr, train_ds, test_ds, layers_dims, lr_decay_weight=0):

    total_costs = []
    total_train_accuracy = 0
    total_test_accuracy = 0

    for i in tqdm(range(total_train), leave=False, bar_format='{n_fmt} / {total_fmt} {bar}'):
        parameters, costs = train(epochs, batch_size, lr, train_ds, layers_dims,
                                  backward_func=vectorized_backward, lr_decay_weight=lr_decay_weight)
        total_costs.append(costs)
        train_result = test(train_ds, parameters)
        test_result = test(test_ds, parameters)
        total_train_accuracy += train_result['accuracy']
        total_test_accuracy += test_result['accuracy']

    costs = (np.sum(np.array(total_costs), axis=0) / total_train).tolist()
    total_train_accuracy /= total_train
    total_test_accuracy /= total_train

    print('train accuracy is {:.2f}%'.format(total_train_accuracy))
    print('test accuracy is {:.2f}%'.format(total_test_accuracy))
    plt.plot(costs)
    plt.show()

In [None]:
%%time

multi_train_test(total_train=10, 
                 epochs=10, 
                 batch_size=10, 
                 lr=1, 
                 train_ds=train_set, 
                 test_ds=test_set, 
                 layers_dims=[102, 150, 60, 4])

<h2>Phase 6: Hyperparameters Analyzing<h2/>

In [None]:
%%time

multi_train_test(total_train=10, 
                 epochs=15, 
                 batch_size=10, 
                 lr=1, 
                 train_ds=train_set, 
                 test_ds=test_set, 
                 layers_dims=[102, 150, 60, 4])

In [None]:
%%time

multi_train_test(total_train=10, 
                 epochs=10, 
                 batch_size=5, 
                 lr=1, 
                 train_ds=train_set, 
                 test_ds=test_set, 
                 layers_dims=[102, 150, 60, 4])

In [None]:
%%time

multi_train_test(total_train=10, 
                 epochs=10, 
                 batch_size=15, 
                 lr=1, 
                 train_ds=train_set, 
                 test_ds=test_set, 
                 layers_dims=[102, 150, 60, 4])

In [None]:
%%time

multi_train_test(total_train=10, 
                 epochs=20, 
                 batch_size=10, 
                 lr=1, 
                 train_ds=train_set, 
                 test_ds=test_set, 
                 layers_dims=[102, 150, 60, 4])

In [None]:
%%time

multi_train_test(total_train=10, 
                 epochs=40, 
                 batch_size=10, 
                 lr=.1, 
                 train_ds=train_set, 
                 test_ds=test_set, 
                 layers_dims=[102, 150, 60, 4])

In [None]:
%%time

multi_train_test(total_train=10, 
                 epochs=100, 
                 batch_size=10, 
                 lr=.01, 
                 train_ds=train_set, 
                 test_ds=test_set, 
                 layers_dims=[102, 150, 60, 4])

In [None]:
%%time

multi_train_test(total_train=10, 
                 epochs=10, 
                 batch_size=10, 
                 lr=2, 
                 train_ds=train_set, 
                 test_ds=test_set, 
                 layers_dims=[102, 150, 60, 4])

<h2>Phase 7: Learning Rate Decay<h2/>

In [251]:
class LR_Decay:
    def __init__(self, initial_lr, k):
        self.initial_lr = initial_lr
        self.t = 0
        self.k = k
        
    def step(self):
        value = self.initial_lr * np.exp(-self.k * self.t)
        self.t += 1
        return max(value, 1)

In [None]:
%%time

multi_train_test(total_train=10, 
                 epochs=30, 
                 batch_size=10, 
                 lr=1.5, 
                 train_ds=train_set, 
                 test_ds=test_set, 
                 layers_dims=[102, 150, 60, 4],
                 lr_decay_weight=0.2)

<h2>Phase 8: Deeper Network !!!<h2/>
<p style='font-size: 19px'>In the last phase, 3 fruits were added to the dataset, so there are seven fruits and also we need Deeper Model !!!<p/>
<p style='font-size: 15px'><em>warning: feel free to add more fruits to the dataset, but you should modify feature extraction a little bit and make your model more complex.<em/><p/>

<h3>Preprocess Data<h3/>

In [253]:
with open("Feature_extraction/train_set_features_7_fruits.pkl", "rb") as f:
    train_set_features2 = pickle.load(f)

with open("Feature_extraction/train_set_labels__7_fruits.pkl", "rb") as f:
    train_set_labels = pickle.load(f)

with open("Feature_extraction/test_set_features__7_fruits.pkl", "rb") as f:
    test_set_features2 = pickle.load(f)

with open("Feature_extraction/test_set_labels__7_fruits.pkl", "rb") as f:
    test_set_labels = pickle.load(f)

In [254]:
# reducing feature vector length 
features_STDs = np.std(a=train_set_features2, axis=0)
train_set_features = train_set_features2[:, features_STDs > 43.4]

# changing the range of data between 0 and 1
train_set_features = np.divide(train_set_features, train_set_features.max())


# reducing feature vector length 
features_STDs = np.std(a=test_set_features2, axis=0)
test_set_features = test_set_features2[:, features_STDs > 40]

# changing the range of data between 0 and 1
test_set_features = np.divide(test_set_features, test_set_features.max())

In [255]:
train_set_optional = []
test_set_optional = []

for i in range(len(train_set_features)):
    label = np.zeros((7, ), dtype='int32')
    label[int(train_set_labels[i])] = 1
    label = label.reshape(7,1)
    train_set_optional.append((train_set_features[i].reshape(118,1), label))
    

for i in range(len(test_set_features)):
    label = np.zeros((7, ), dtype='int32')
    label[int(test_set_labels[i])] = 1
    label = label.reshape(7,1)
    test_set_optional.append((test_set_features[i].reshape(118,1), label))
    
# shuffle
random.shuffle(train_set_optional)
random.shuffle(test_set_optional)

In [None]:
print(train_set_features.shape)
print(train_set_labels.shape)

In [None]:
train_set_optional[0][0].shape

<h3>Train and Test<h3/>

In [None]:
%%time

multi_train_test(total_train=10, 
                 epochs=20, 
                 batch_size=10, 
                 lr=1, 
                 train_ds=train_set_optional, 
                 test_ds=test_set_optional, 
                 layers_dims=[118, 150, 60, 60, 7])