<a href="https://colab.research.google.com/github/daidoAI/classify_vehicle_registration/blob/main/1_1_Train.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Prepare

In [None]:
cd /data/me/Docs_Classification/Docs_Classification/

In [None]:
import os
import os.path as op
from glob import glob
from sklearn.model_selection import train_test_split as split
import cv2
from imgaug import augmenters as iaa
import imgaug as ia
import tensorflow as tf
from tensorflow.keras.layers import *
import numpy as np
import random
import matplotlib.pyplot as plt
from IPython.display import clear_output
import shutil

In [None]:
# Label
labels = ['birth_certificate',
          'chip_back',
          'chip_front',
          'new_back',
          'new_front',
          'old_back',
          'old_front',
          'new_driver_license',
          'old_driver_license',
          'passport_foreign',
          'passport_vn',
          'other']

# Preprocessing and indicate
resize = 224
channel = 3

# Prepare Datasets

In [None]:
x_train = {}
x_val = {}
x_test = {}
for label in labels:
    x_train[label] = []
    x_val[label] = []
    x_test[label] = []
    for path in glob(op.join('train_datasets',label,'**'),recursive=True):
        if (path[-4:].lower() in ['.jpg','.png']) or (path[-5:].lower() == '.jpeg'):
            if np.random.uniform() > 0.05:
                x_train[label].append(path)
            else:
                x_val[label].append(path)
    for path in glob(op.join('test_datasets',label,'**'),recursive=True):
        if (path[-4:].lower() in ['.jpg','.png']) or (path[-5:].lower() == '.jpeg'):
            x_test[label].append(path)
    
    
                
print('Training')
for label in x_train:
    print(f'{label}: {len(x_train[label])}')
    
print('\nValidation')
for label in x_val:
    print(f'{label}: {len(x_val[label])}')
    
print('\nTesting')
for label in x_test:
    print(f'{label}: {len(x_test[label])}')

# Preprocessing

In [None]:
def preprocessing_img(paths,
                      labels=None,
                      train=False,
                      resize=resize):
    images = []
    aug = iaa.SomeOf((0, None), [
        iaa.AdditiveGaussianNoise(scale=(0, 0.1*255), per_channel=True),
        iaa.Cutout(nb_iterations=1, size=0.2, squared=False, fill_mode="constant", cval=(0, 255)),
        iaa.GaussianBlur(sigma=(0.0, 1.0)),
        iaa.WithBrightnessChannels(iaa.Add((-50, 50)), from_colorspace=iaa.CSPACE_BGR),
        iaa.MultiplySaturation((0.75, 1.25)),
        iaa.GammaContrast((0.75, 1.5)),
        # iaa.Fliplr(1),
        # iaa.Flipud(1),
        iaa.Affine(scale=(0.75, 1.25), mode=ia.ALL, cval=(0, 255)),
        iaa.Affine(translate_percent={"x": (-0.2, 0.2), "y": (-0.2, 0.2)}, mode=ia.ALL, cval=(0, 255)),
        iaa.Affine(rotate=(-180, 180), mode=ia.ALL, cval=(0, 255)),
        iaa.Affine(shear=(-10, 10), mode=ia.ALL, cval=(0, 255)),
        iaa.PiecewiseAffine(scale=(0.01, 0.03)),
        iaa.PerspectiveTransform(scale=(0.01, 0.05)),
        iaa.CropAndPad(percent=(-0.15, 0.15), pad_mode=ia.ALL, pad_cval=(0, 256))
    ], random_order=True)
    
    for path in paths:
        image = cv2.imread(path)
        image = cv2.resize(image, (resize, resize))
        images.append(image)
        
    if train:
        images = aug(images=images)
        
    images = np.array(images).astype(np.float64)
    images = images/255
    
    if labels is not None:
        labels = np.array(labels)
        return images, labels
    else:
        return images

# Build Model

In [None]:
def build_model(resize=resize,channel=channel):
    input_size = (resize,resize,channel)
    inputs = Input(shape=input_size)
    # x = tf.keras.applications.efficientnet.EfficientNetB7(include_top=False,weights='imagenet')(inputs)
    x = tf.keras.applications.mobilenet_v2.MobileNetV2(include_top=False,weights='imagenet')(inputs)
    # x = tf.keras.applications.resnet.ResNet152(include_top=False,weights='imagenet')(inputs)
    x = Flatten()(x)
    x = Dropout(0.5)(x)
    x = Dense(len(labels), activation='softmax')(x)
    return tf.keras.Model(inputs = inputs, outputs = x)

model1 = build_model()
model1.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5),
             loss=tf.keras.losses.SparseCategoricalCrossentropy(),
             metrics=['accuracy'])

ckpt_path = 'models/train/MobileNetV2.1.h5'
if op.exists(ckpt_path):
    print('Load pretrained weights')
    model1.load_weights(ckpt_path)

X_check_model = np.random.random((1,resize,resize,channel))
print(model1.predict(X_check_model).shape)

# Define Train and Test

In [None]:
def train(model = model1,
          labels = labels,
          x_train = x_train,
          sample = 180,
          x_val = x_val,
          x_test = None,
          batch_size = 8,
          epochs = 1000,
          ckpt_path = 'models/train',
          threshold = 0.5):
    min_val_loss = 9999
    _epoch = []
    train_acc = []
    train_loss = []
    if x_val is not None:
        val_acc = []
        val_loss = []
    for epoch in range(epochs):
        
        ##### Train #####
        print(f'Epoch {epoch+1}')
        
        x_train_epoch = [] # Take equal images for each label
        y_train_epoch = []
        for label in x_train:
            x_train_epoch.extend(random.sample(x_train[label],min(sample,len(x_train[label]))))
            y_train_epoch.extend([labels.index(label)] * sample)
            
        seed = np.random.randint(0, 10000) # Shuffle
        np.random.seed(seed)
        np.random.shuffle(x_train_epoch)
        np.random.seed(seed)
        np.random.shuffle(y_train_epoch)
        
        total_iteration = len(y_train_epoch)//batch_size # Train iterations for each epoch
        iter_acc = []
        iter_loss = []
        
        for iteration in range(total_iteration):
            x_train_iter = x_train_epoch[(batch_size*iteration):(batch_size*(iteration+1))]
            y_train_iter = y_train_epoch[(batch_size*iteration):(batch_size*(iteration+1))]
            X_train_iter, Y_train_iter = preprocessing_img(paths=x_train_iter,
                                                           labels=y_train_iter,
                                                           train=True, 
                                                           resize=resize)
            
            if iteration == 0:
                sample_image = (X_train_iter[0]*255).astype(np.uint8)
                sample_image = cv2.cvtColor(sample_image, cv2.COLOR_BGR2RGB)
                plt.imshow(sample_image)
                plt.show()
                print(f'Label: {labels[Y_train_iter[0]]}')  
            history = model.fit(x=X_train_iter,
                                y=Y_train_iter,
                                verbose=0)
            iter_acc.append(history.history['accuracy'][-1])
            iter_loss.append(history.history['loss'][-1])
            
        # _train_acc, _train_loss = test(model,x_train,y_train)
        _train_acc = np.mean(iter_acc)*100
        _train_loss = np.mean(iter_loss)
        print(f'Train accuracy: {_train_acc:.2f}%')
        print(f'Train loss: {_train_loss}')
        train_acc.append(_train_acc)
        train_loss.append(_train_loss)
        _epoch.append(epoch+1)
        
        
        ##### Val #####
        if x_val is not None:
            _val_acc, _val_loss = test(model, labels, x_val, threshold)
            _val_acc = _val_acc*100
            print(f'Validation accuracy: {_val_acc:.2f}%')
            print(f'Validation loss: {_val_loss}')
            if _val_loss <= min_val_loss:
                min_val_loss = _val_loss
                model.save(op.join(ckpt_path,'MobileNetV2.1.h5'))
            val_acc.append(_val_acc)
            val_loss.append(_val_loss)

                
        ##### Test #####
        if x_test is not None:
            if type(x_test) is dict:
                _test_acc, _test_loss = test(model, labels, x_test, threshold)
                _test_acc = _test_acc*100
                print(f'Test accuracy: {_test_acc:.2f}%')
                print(f'Test loss: {_test_loss}')
            else:
                test(model, labels, x_test, show=True)
        
        if (epoch+1)%10 == 0:
            clear_output()
            fig1, (ax1, ax2) = plt.subplots(1, 2, figsize=(22, 6))
            ax1.plot(_epoch, train_acc, label = 'Training Accuracy')
            if x_val is not None:
                ax1.plot(_epoch, val_acc, label = 'Validation Accuracy')
            ax1.set_title('Model Accuracy')
            ax1.set_xlabel('Epoch')
            ax1.set_ylabel('Accuracy')
            ax1.set_ylim(95, 100)
            ax1.legend(loc='lower right')
            ax1.grid(True)
            ax2.plot(_epoch, train_loss, label = 'Training Loss')
            if x_val is not None:
                ax2.plot(_epoch, val_loss, label = 'Validation Loss')
            ax2.set_title('Model Loss')
            ax2.set_xlabel('Epoch')
            ax2.set_ylabel('Loss')
            ax2.legend(loc='upper right')
            ax2.grid(True)
            plt.show()
        
        model.save(op.join(ckpt_path,'MobileNetV2.1.last.h5'))

In [None]:
def test(model = model1,
         labels = labels,
         x_test = {},
         threshold = 0.5,
         show = False,
         show_false = False):
    if type(x_test) is dict: # Provide a dict containing both paths and results
        _x_test = []
        _y_test = []
        for label in x_test:
            for path in x_test[label]:
                _x_test.append(path)
                _y_test.append(labels.index(label))        
        total = 0
        true = 0
        loss = []        
        for x_test_1, y_test_1 in zip(_x_test,_y_test):
            X_test_1, Y_test_1 = preprocessing_img(paths=np.array([x_test_1]),
                                                   labels=np.array([y_test_1]),
                                                   train=False, 
                                                   resize=resize)
            Y_pred_1 = model.predict(X_test_1,verbose=0)[0]
            # print(Y_pred_1)
            y_pred_1 = np.argmax(Y_pred_1)
            # print(y_pred_1)
            if Y_pred_1[y_pred_1] < threshold: # Ket qua phai lon hon 0.5
                y_pred_1 = len(labels)-1
            total += 1
            if y_test_1 == y_pred_1:
                true += 1
            if show | (show_false & (y_test_1!=y_pred_1)):
                print(f'\nImage path: {x_test_1}')
                image = cv2.imread(x_test_1)
                image = cv2.cvtColor(image,cv2.COLOR_RGB2BGR)
                plt.imshow(image)
                plt.show()
                print(f'True label: {labels[y_test_1]}')
                print(f'Predicted label: {labels[y_pred_1]}')
                print('\n')
            loss.append(model.loss([y_test_1],[Y_pred_1]).numpy())
        accuracy = true/total
        loss = np.mean(loss)
        return accuracy, loss
    
    elif type(x_test) is str: # Provide a path
        X_test = preprocessing_img(paths=np.array([x_test]),
                                   labels=None,
                                   train=False, 
                                   resize=resize)
        Y_pred_1 = model.predict(X_test,verbose=0)[0]
        y_pred_1 = np.argmax(Y_pred_1)
        if Y_pred_1[y_pred_1] < threshold: # Ket qua phai lon hon 0.5
            result = labels[len(labels)-1]
        else:
            result = labels[y_pred_1]
        if show:
            print(f'\nImage path: {x_test}')
            image = cv2.imread(x_test)
            image = cv2.cvtColor(image,cv2.COLOR_RGB2BGR)
            plt.imshow(image)
            plt.show()
            if result != labels[len(labels)-1]:
                print(f'Predicted label: {result}')
            else:
                print(f'Cannot predict type of paper')
            print('\n')
        return result
    
    elif type(x_test) is list: # Provide a list of paths
        results = []
        for x_test_1 in x_test:
            results.append(test(model=model, labels=labels, x_test=x_test_1, threshold=threshold, show=show))
        return results
    
    else:
        raise Exception("Wrong type x_test")

# Train

In [None]:
train()

# Check Model

In [None]:
model1.load_weights('models/train/MobileNetV2.1.h5')

## val

In [None]:
for label in x_val:
    paths = x_val[label]
    results = test(model = model1,
                   labels = labels,
                   x_test = paths,
                   threshold = 0.5)
    result = np.where(np.array(results)==label,1,0)
    result = np.sum(result)/len(result)
    print(f'\nLabel: {label}')
    print(f'Accuracy: {result*100}%')
    for i, j in zip(paths,results):
        if j != label:
            print(f'Path: {i}')
            image = cv2.imread(i)
            image = cv2.cvtColor(image,cv2.COLOR_RGB2BGR)
            plt.imshow(image)
            plt.show()
            print(f'Predict: {j}')

## test

In [None]:
for label in x_test:
    paths = x_test[label]
    results = test(model = model1,
                   labels = labels,
                   x_test = paths,
                   threshold = 0.5)
    result = np.where(np.array(results)==label,1,0)
    result = np.sum(result)/len(result)
    print(f'\nLabel: {label}')
    print(f'Accuracy: {result*100}%')
    for i, j in zip(paths,results):
        if j != label:
            print(f'Path: {i}')
            image = cv2.imread(i)
            image = cv2.cvtColor(image,cv2.COLOR_RGB2BGR)
            plt.imshow(image)
            plt.show()
            print(f'Predict: {j}')

# Save Model

In [None]:
model1.load_weights('models/train/MobileNetV2.1.h5')
model1.save('models/train/saved_model')

# Predict