# **TÌM KIẾM SẢN PHẨM BẰNG HÌNH ẢNH HUẤN LUYỆN VỚI MÔ HÌNH CNN**



## **CONNECT TO DRIVE AND DECLARE PARAMS**

In [None]:
import os
from keras.models import model_from_json
from google.colab import drive
drive.mount('/content/drive/')

def get_path_label(labels):
    return os.path.join(label_path,"le_" + "_".join(labels) + ".npy")

def get_path_model_json(labels):
    return os.path.join(model_path,"model_" + "_".join(labels) + ".json")

def get_path_model_h5(labels):
    return os.path.join(model_path,"weight_" + "_".join(labels) + ".h5")

def load_model(labels):
    json_file = open(get_path_model_json(labels), 'r')
    loaded_model_json = json_file.read()
    json_file.close()
    model = model_from_json(loaded_model_json)
    model.load_weights(get_path_model_h5(labels))
    return model

# path dir
orgdata_path = os.path.join("/content/drive/MyDrive/HTTM/dataset_org")
stddata_path = os.path.join("/content/drive/MyDrive/HTTM/dataset_std")
incdata_path = os.path.join("/content/drive/MyDrive/HTTM/dataset_inc")
model_path   = os.path.join("/content/drive/MyDrive/HTTM/model")
label_path   = os.path.join("/content/drive/MyDrive/HTTM/label")

# normalized image
size = (64, 64)
nchanels = 3

Mounted at /content/drive/


## **CREATING AUGMENTED DATASET**

In [None]:
import os
import cv2
import shutil
from keras.preprocessing.image import ImageDataGenerator
from numpy import expand_dims

num_aug_data = 10

# load_data:
print('Writing dir', stddata_path)
for folder in os.listdir(orgdata_path):

    curr_path = os.path.join(orgdata_path, folder)
    save_path = os.path.join(stddata_path, folder)

    try:
        os.mkdir(save_path)
    except:
        shutil.rmtree(save_path)
        os.mkdir(save_path)

    print('Creating folder', folder + '...')

    for _folder in os.listdir(curr_path):

        _curr_path = os.path.join(curr_path, _folder)
        _save_path = os.path.join(save_path, _folder)

        os.mkdir(_save_path)

        print('   Creating folder', _folder + '...', end = ' ')

        for file in os.listdir(_curr_path):
            curr_file = os.path.join(_curr_path,file)

            image = cv2.imread(curr_file)
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            image = cv2.resize(image, dsize = size, interpolation= cv2.INTER_LINEAR)
            image = expand_dims(image, axis = 0)

            # augment data
            myImageGen = ImageDataGenerator(width_shift_range=[-5,5], height_shift_range=[-5,5], rotation_range=10, shear_range=10)
            gen = myImageGen.flow(image, batch_size=1)

            for i in range(num_aug_data):
                myBatch = gen.next()

                I = myBatch[0].astype('uint8')

                path_save = os.path.join(_save_path, file[:file.find('.')] + '_aug_' + str(i) + '_' + file[file.find('.'):])
                cv2.imwrite(path_save, I)

        print('OK!')
print('Completed!')

Writing dir /content/drive/MyDrive/HTTM/dataset_std
Creating folder washing_machine...
   Creating folder mg01... OK!
   Creating folder mg02... OK!
   Creating folder mg03... OK!
   Creating folder mg04... OK!
   Creating folder mg05... OK!
   Creating folder mg06... OK!
   Creating folder mg07... OK!
   Creating folder mg08... OK!
   Creating folder mg09... OK!
   Creating folder mg10... OK!
Creating folder fridge...
   Creating folder tl06... OK!
   Creating folder tl05... OK!
   Creating folder tl03... OK!
   Creating folder tl02... OK!
   Creating folder tl04... OK!
   Creating folder tl01... OK!
   Creating folder tl07... OK!
   Creating folder tl09... OK!
   Creating folder tl10... OK!
   Creating folder tl08... OK!
Creating folder blender...
   Creating folder mx06... OK!
   Creating folder mx03... OK!
   Creating folder mx04... OK!
   Creating folder mx02... OK!
   Creating folder mx08... OK!
   Creating folder mx09... OK!
   Creating folder mx05... OK!
   Creating folder mx10

## **CREATING INCORRECT DATASET**

In [None]:
images = []

print('Reading dir', stddata_path)
for folder in os.listdir(stddata_path):

    curr_path = os.path.join(stddata_path, folder)

    print('Reading folder', folder + '...')

    for _folder in os.listdir(curr_path):

        _curr_path = os.path.join(curr_path, _folder)

        print('   Reading folder', _folder + '...', end = ' ')

        for file in os.listdir(_curr_path):
            curr_file = os.path.join(_curr_path,file)

            image = cv2.imread(curr_file)
            images.append(image)

        print('OK!')

import random
random.shuffle(images)

print('Writing dir', incdata_path)
for folder in os.listdir(stddata_path):

    curr_path = os.path.join(stddata_path, folder)
    save_path = os.path.join(incdata_path, folder)

    try:
        os.mkdir(save_path)
    except:
        shutil.rmtree(save_path)
        os.mkdir(save_path)

    print('Creating folder', folder + '...')

    for _folder in os.listdir(curr_path):

        _save_path = os.path.join(save_path, _folder)

        os.mkdir(_save_path)

        print('   Creating folder', _folder + '...', end = ' ')

        for i in range(num_aug_data):

            path_save = os.path.join(_save_path, _folder + '_image_' + str(i) + '.jpg')
            cv2.imwrite(path_save, images[i])

        images = images[num_aug_data:]

        print('OK!')

print('Completed!')


Reading dir /content/drive/MyDrive/HTTM/dataset_std
Reading folder washing_machine...
   Reading folder mg01... OK!
   Reading folder mg02... OK!
   Reading folder mg03... OK!
   Reading folder mg04... OK!
   Reading folder mg05... OK!
   Reading folder mg06... OK!
   Reading folder mg07... OK!
   Reading folder mg08... OK!
   Reading folder mg09... OK!
   Reading folder mg10... OK!
Reading folder fridge...
   Reading folder tl06... OK!
   Reading folder tl05... OK!
   Reading folder tl03... OK!
   Reading folder tl02... OK!
   Reading folder tl04... OK!
   Reading folder tl01... OK!
   Reading folder tl07... OK!
   Reading folder tl09... OK!
   Reading folder tl10... OK!
   Reading folder tl08... OK!
Reading folder blender...
   Reading folder mx06... OK!
   Reading folder mx03... OK!
   Reading folder mx04... OK!
   Reading folder mx02... OK!
   Reading folder mx08... OK!
   Reading folder mx09... OK!
   Reading folder mx05... OK!
   Reading folder mx10... OK!
   Reading folder mx01.

# **TRAIN MODEL**
- Level = 0 and type_sp = '' : Huấn luyện model đoán loại sản phẩm
- Level = 1 and type_sp = '<tên sp>': Huấn luyện model đoán id sản phẩm

In [None]:
import os
import cv2
import numpy as np
from sklearn.model_selection import train_test_split
from keras.models import Sequential
from keras.layers import Dense, Dropout, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.utils import to_categorical
from keras_preprocessing.sequence import pad_sequences

# params traning
epochs = 100
batch_size = 128
_loss = 'categorical_crossentropy'
_optimizer = 'adam'
_metrics = ['accuracy']

# params loading data
'''
    level = 0  and type_sp = '': training product type (LEVEL 0)
    level = 1  and type_sp = '<product type>': training product id (LEVEL 1)
'''
level = 1
type_sp = 'blender' # '', 'washing_machine', ...

# declare vars
labels = []
X = []
y = []

# load_data:
data_folder = os.path.join(stddata_path, type_sp)
for folder in os.listdir(data_folder):

    if level == 0:
        labels.append(folder)
        curr_path = os.path.join(data_folder, folder)
    else:
        curr_path = data_folder

    for _folder in os.listdir(curr_path):
        _curr_path = os.path.join(curr_path, _folder)

        if level == 1:
            labels.append(_folder)

        for file in os.listdir(_curr_path):
            curr_file = os.path.join(_curr_path,file)

            image = cv2.imread(curr_file)

            X.append(image)

            if level == 0 :
                y.append(labels.index(folder))
            else:
                y.append(labels.index(_folder))

    if level == 1:
          break

# split_data
X_train_val, X_test, y_train_val, y_test = train_test_split(X, y, test_size = 0.2)
X_train, X_val, y_train, y_val = train_test_split(X_train_val, y_train_val, test_size = 0.2)

# limit length of sequences, scaler X in [0,1]
X_train = pad_sequences(X_train, maxlen = size[1],dtype = 'float32')/255.0
X_val = pad_sequences(X_val, maxlen = X_train.shape[1],dtype = 'float32')/255.0
X_test = pad_sequences(X_test, maxlen = X_train.shape[1],dtype = 'float32')/255.0
num_classes = len(labels)

# one-hot vector
y_train = to_categorical(y_train, num_classes)
y_val = to_categorical(y_val,num_classes)
y_test = to_categorical(y_test,num_classes)

# build model
model = Sequential()
input_size = (size[0], size[1], nchanels)

model.add(Conv2D(256, kernel_size=(3, 3),activation='relu', input_shape= input_size))
model.add(MaxPooling2D(pool_size=(2, 2), padding='same'))

model.add(Conv2D(128, kernel_size=(3, 3),activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2), padding='same'))

model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2), padding='same'))

model.add(Conv2D(32, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2), padding='same'))

model.add(Flatten())

model.add(Dropout(0.1))
model.add(Dense(1024, activation='relu'))

model.add(Dropout(0.1))
model.add(Dense(512, activation='relu'))

model.add(Dense(num_classes, activation='softmax'))

# compile model
model.summary()
model.compile(loss = _loss, optimizer = _optimizer, metrics = _metrics)

# fit model
model.fit(X_train, y_train, epochs = epochs, batch_size = batch_size, validation_data = (X_val, y_val))

# save model
model.save(os.path.join(model_path, 'weight_' + type_sp + ".h5"))
model_json = model.to_json()
with open(os.path.join(model_path, 'model_' + type_sp + ".json"), 'w') as json_file:
    json_file.write(model_json)

# save labels
import numpy as np
np.save(os.path.join(label_path, 'le_' + type_sp + '.npy'), labels, allow_pickle=True)

# evaluate model
loss, acc = model.evaluate(X_test, y_test)
print("Loss: ",loss)
print("Acc: ",acc)

Model: "sequential_21"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d_84 (Conv2D)          (None, 62, 62, 256)       7168      
                                                                 
 max_pooling2d_84 (MaxPoolin  (None, 31, 31, 256)      0         
 g2D)                                                            
                                                                 
 conv2d_85 (Conv2D)          (None, 29, 29, 128)       295040    
                                                                 
 max_pooling2d_85 (MaxPoolin  (None, 15, 15, 128)      0         
 g2D)                                                            
                                                                 
 conv2d_86 (Conv2D)          (None, 13, 13, 64)        73792     
                                                                 
 max_pooling2d_86 (MaxPoolin  (None, 7, 7, 64)       

In [None]:
import os
import cv2
import numpy as np
from keras_preprocessing.sequence import pad_sequences

def predict(image, labels, is_last):
    image = cv2.resize(image, dsize = size, interpolation= cv2.INTER_LINEAR)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = np.expand_dims(image, axis = 0)
    image_pad = pad_sequences(image, maxlen = size[1], dtype = 'float32')/255.0
    model = load_model(labels)
    pred = model.predict(image_pad)
    dict_labels = np.load(get_path_label(labels), allow_pickle=True)
    if is_last == False:
        return dict_labels[np.argmax(pred)]
    else:
        return dict_labels, pred.tolist()[0]

def predict_final(image, num_level = 2):
    labels = []
    for i in range(0,num_level):
        if i < num_level - 1:
            label = predict(image, labels, is_last = False)
            labels.append(label)
        else:
            dict_labels, pred = predict(image, labels, is_last = True)
            key_sm = [(dict_labels[i],pred[i]) for i in range(len(pred))]
            return [x[0] for x in sorted(key_sm, key = lambda x: x[1], reverse=True)]

if __name__ == '__main__':
    image = cv2.imread("/content/drive/MyDrive/HTTM/dataset_org/fridge/tl07/tl07_2.jpg")
    print("ids:", predict_final(image, 2))

ids: ['tl07', 'tl04', 'tl01', 'tl03', 'tl08', 'tl06', 'tl10', 'tl02', 'tl09', 'tl05']


## **TEST MODEL**

In [None]:
from keras_preprocessing.sequence import pad_sequences
from keras.utils import to_categorical
import pandas as pd
import numpy as np
import cv2

def load_data(typePro, path):

    X = []
    y = []

    labels = np.load(get_path_label(typePro), allow_pickle=True).tolist()

    data_folder = os.path.join(path, typePro[0])
    for folder in os.listdir(data_folder):

        if typePro[0] == '':
            curr_path = os.path.join(data_folder, folder)
        else:
            curr_path = data_folder

        for _folder in os.listdir(curr_path):
            _curr_path = os.path.join(curr_path, _folder)

            for file in os.listdir(_curr_path):
                curr_file = os.path.join(_curr_path,file)

                image = cv2.imread(curr_file)

                X.append(image)

                if typePro[0] == '':
                    y.append(labels.index(folder))
                else:
                    y.append(labels.index(_folder))

        if typePro[0] != '':
            break

    X = pad_sequences(X, maxlen = size[1],dtype = 'float32')/255.0
    y = to_categorical(y, len(labels))

    return X, y

def Evaluate():

    Pre_Products = [['']]
    Pre_Products.extend([[product] for product in os.listdir(stddata_path)])

    df = pd.DataFrame(index = ['std_dataset', 'inc_dataset'],
                      columns= ['loss', 'accuracy'])

    for product in Pre_Products:

        model = load_model(product)
        model.compile(loss = 'categorical_crossentropy', optimizer = 'adam', metrics = ['accuracy'])

        X_std, y_std = load_data(product, stddata_path)
        X_inc, y_inc = load_data(product, incdata_path)

        loss_std, acc_std = model.evaluate(X_std, y_std)
        loss_inc, acc_inc = model.evaluate(X_inc, y_inc)

        df['loss'] = [loss_std, loss_inc]
        df['accuracy'] = [acc_std, acc_inc]

        if product[0] == '':
            print('Evaluate model predict typePro: ')
        else:
            print('Evaluate model predict idPro<%s>: '%(product[0]))

        print(df, end = '\n\n')

if __name__ == '__main__':
    Evaluate()

Evaluate model predict typePro: 
                  loss  accuracy
std_dataset   0.081694  0.978484
inc_dataset  18.453041  0.152000

Evaluate model predict idPro<washing_machine>: 
                  loss  accuracy
std_dataset   0.085254     0.975
inc_dataset  13.549906     0.070

Evaluate model predict idPro<fridge>: 
                  loss  accuracy
std_dataset   0.103676  0.978667
inc_dataset  18.018713  0.100000

Evaluate model predict idPro<blender>: 
                  loss  accuracy
std_dataset   0.051648  0.987342
inc_dataset  12.475022  0.090000

Evaluate model predict idPro<cooker>: 
                  loss  accuracy
std_dataset   0.020405  0.996386
inc_dataset  14.682525  0.050000

Evaluate model predict idPro<vacuum>: 
                  loss  accuracy
std_dataset   0.077322  0.988636
inc_dataset  15.217699  0.090000



# **RUN SERVER**

In [None]:
from fastapi import FastAPI
from predict import predict_final
import uvicorn
import cv2

app = FastAPI()
@app.get("/submit/")
async def submit(path: str):
    image = cv2.imread(path)
    pred = predict_final(image)
    return {'ids':pred}

if __name__ == '__main__':
    uvicorn.run(app, host = '127.0.0.1', port = 8080)