In [1]:
import os

os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
os.environ['CUDA_VISIBLE_DEVICES'] = '1'

from dataload import *
from model import *
import tensorflow as tf
import numpy as np
import time

tf.compat.v1.logging.set_verbosity(tf.compat.v1.logging.ERROR)

In [2]:
epochs = 12
train_img_dir = './dataset/train'
test_img_dir = './dataset/eval'
initial_lr = 0.1
decay_steps = 3
dropout_rate = 0.2 
lpr_len = 7
train_batch_size = 128
test_batch_size = 128
momentum = 0.9
saved_model_folder = './saved_model'
pretrained_model = ''

In [3]:
def tflite_infer(tflite_file, image_path):
    # tflite 模型推理
    interpreter = tf.lite.Interpreter(model_path=str(tflite_file))
    interpreter.allocate_tensors()

    # Get input and output tensors.
    input_details = interpreter.get_input_details()[0]
    output_details = interpreter.get_output_details()[0]
    height = input_details['shape'][1]
    width = input_details['shape'][2]

    # 单个测试样本数据
    image = cv2.imread(image_path)
    image = cv2.resize(image, (width, height))
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = np.astype(image, np.float32) / 255.
    image = np.expand_dims(image, axis=0)
    image.flatten()
    interpreter.set_tensor(input_details['index'], image)

    start_time = time.time()
    interpreter.invoke()
    stop_time = time.time()

    # interpreter = tf.lite.Interpreter(model_content=tflite_model)
    input_type = interpreter.get_input_details()[0]['dtype']
    print('input: ', input_type)
    output_type = interpreter.get_output_details()[0]['dtype']
    print('output: ', output_type)

    output_data = interpreter.get_tensor(output_details['index'])
    print(output_data)
    print('time: {:.3f}ms'.format((stop_time - start_time) * 1000))
    print("model size: {:.2f} MB".format(os.path.getsize(tflite_file)/1024/1024))

In [4]:
def metric(test_dataset, lprnet):
    
    tp, tp_error = 0, 0  # tp为正确预测数量, tp_error是错误预测数量
    start_time = time.time()
    for cur_batch, (test_imgs, test_labels) in enumerate(test_dataset):
        test_labels = tf.cast(test_labels, tf.int32)
        prebs = lprnet(test_imgs)
        preb_labels = list()
        for i in range(prebs.shape[0]):
            preb = prebs[i, :, :]
            preb_label = list()
            for j in range(preb.shape[0]):
                preb_label.append(np.argmax(preb[j, :], axis=0))
            no_repeat_blank_label = list()
            pre_c = preb_label[0]
            if pre_c != len(CHARS) - 1:
                no_repeat_blank_label.append(pre_c)
            for c in preb_label:
                if (pre_c == c) or (c == len(CHARS) - 1):
                    if c == (len(CHARS) - 1):
                        pre_c = c
                    continue
                no_repeat_blank_label.append(c)
                pre_c = c
            preb_labels.append(no_repeat_blank_label)
        for i, label in enumerate(preb_labels):
            if len(label) != len(test_labels[i]):
                tp_error += 1
                continue
            if (np.asarray(test_labels[i]) == np.asarray(label)).all():
                tp += 1
            else:
                tp_error += 1
                
    end_time = time.time()
    t = end_time - start_time
    acc = tp * 1.0 / (tp + tp_error)  # 准确率
    
    return acc, tp, tp_error, t

In [4]:
# 加载训练数据集
tr_imgs, tr_labels, train_imgs_num = load_data(img_dir=train_img_dir, lpr_len=lpr_len)
tr_imgs = tr_imgs.map(preprocess)
train_dataset = tf.data.Dataset.zip((tr_imgs, tr_labels)).shuffle(train_imgs_num).batch(train_batch_size).prefetch(tf.data.experimental.AUTOTUNE).cache()

# 加载测试数据集
te_imgs, te_labels, test_imgs_num = load_data(img_dir=test_img_dir, lpr_len=lpr_len)
te_imgs = te_imgs.map(preprocess)
test_dataset = tf.data.Dataset.zip((te_imgs, te_labels)).batch(test_batch_size).prefetch(tf.data.experimental.AUTOTUNE).cache()

In [5]:
tr1_imgs, tr1_labels, train1_imgs_num = load_data(img_dir=test_img_dir, lpr_len=lpr_len)
tr1_imgs = tr1_imgs.map(preprocess, num_parallel_calls=4)
train1_dataset = tf.data.Dataset.zip((tr1_imgs, tr1_labels)).shuffle(train1_imgs_num).batch(train_batch_size).prefetch(tf.data.experimental.AUTOTUNE).cache()

te1_imgs, te1_labels, test1_imgs_num = load_data(img_dir='./lpr_quantitative_data', lpr_len=lpr_len)
te1_imgs = te1_imgs.map(preprocess, num_parallel_calls=4)
test1_dataset = tf.data.Dataset.zip((te1_imgs, te1_labels)).batch(test_batch_size).prefetch(tf.data.experimental.AUTOTUNE).cache()


In [6]:
# 训练模型
if not os.path.exists(saved_model_folder):
    os.mkdir(saved_model_folder)

# 实例化模型
lprnet = LPRNet(lpr_len=lpr_len, class_num=68, dropout_rate=0)
print("\n ********** Successful to build network! ********** \n")

# 加载预训练模型
if pretrained_model:
    lprnet.load_weights(pretrained_model)
    print("\n ********** Successful to load pretrained model! ********** \n")

# learning rate 随着 epoch 指数递减
lr_schedules = tf.keras.optimizers.schedules.ExponentialDecay(
        initial_learning_rate=initial_lr,
        decay_steps=decay_steps,
        staircase=True,
        decay_rate=0.5)

# 优化器使用RMSprop
optimizer = tf.keras.optimizers.RMSprop(learning_rate=lr_schedules, momentum=momentum)
# optimizer = tf.keras.optimizers.Adam(learning_rate=lr_schedules)

# 模型训练
top_acc = 0.
for cur_epoch in range(1, epochs + 1):
    batch = 0
    for batch_index, (train_imgs, train_labels) in enumerate(train1_dataset):
        start_time = time.time()
        with tf.GradientTape() as tape:
            train_logits = lprnet(train_imgs)
            train_labels = tf.cast(train_labels, tf.int32)
            train_logits = tf.transpose(train_logits, [2, 0, 1])
            logits_shape = train_logits.shape
            logit_length = tf.fill([logits_shape[1]], logits_shape[2])
            label_length = tf.fill([logits_shape[1]], lpr_len)
            loss = tf.nn.ctc_loss(labels=train_labels,
                                  logits=train_logits,
                                  label_length=label_length,
                                  logit_length=logit_length,
                                  logits_time_major=True,
                                  blank_index=len(CHARS) - 1)
            loss = tf.reduce_mean(loss)
        grads = tape.gradient(loss, lprnet.variables)
        optimizer.apply_gradients(grads_and_vars=zip(grads, lprnet.variables))
        end_time = time.time()
        batch = batch + int(np.shape(train_imgs)[0])
        print('\r' + "Epoch {0}/{1} || ".format(cur_epoch, epochs) 
                  + "Batch {0}/{1} || ".format(batch, train1_imgs_num)
                  + "Loss:{} || ".format(loss) 
                  + "A Batch time:{0:.4f}s || ".format(end_time - start_time)
                  + "Learning rate:{0:.8f} || ".format(np.array(lr_schedules(cur_epoch))), end=''*20)
    acc, tp, tp_error, t = metric(test1_dataset, lprnet)
    print("\n******* Prediction {0}/{1} || Acc:{2:.2f}% *******".format(tp, tp + tp_error, acc))
    print("******* Test speed: {}s 1/{} *******".format(t / (tp + tp_error), tp + tp_error))
        
    # 保存模型
    if acc >= top_acc:
        top_acc = acc
        lprnet.save(saved_model_folder, save_format='tf')

# 将.pb模型转为.tflite
cvtmodel = tf.keras.models.load_model(saved_model_folder)
converter = tf.lite.TFLiteConverter.from_keras_model(cvtmodel)
tflite_model = converter.convert()
with open('lprnet' + '{}'.format(np.around(top_acc * 100)) + '.tflite', "wb") as f:
    f.write(tflite_model)
print("\n ********** Successful to convert tflite model! ********** \n")


 ********** Successful to build network! ********** 



InvalidArgumentError: required broadcastable shapes at loc(unknown) [Op:Mul]

In [62]:
tp, tp_1, tp_2 = 0, 0, 0  # tp为正确预测数量, tp_error是错误预测数量
start_time = time.time()
for cur_batch, (test_imgs, test_labels) in enumerate(test1_dataset):
    test_labels = tf.cast(test_labels, tf.int32)
    prebs = lprnet(test_imgs)
    preb_labels = list()
    for i in range(prebs.shape[0]):
        preb = prebs[i, :, :]
        preb_label = list()
        for j in range(preb.shape[0]):
            preb_label.append(np.argmax(preb[j, :], axis=0))
        no_repeat_blank_label = list()
        pre_c = preb_label[0]
        print(pre_c)
        if pre_c != len(CHARS) - 1:
            no_repeat_blank_label.append(pre_c)
        for c in preb_label:
            if (pre_c == c) or (c == len(CHARS) - 1):
                if c == (len(CHARS) - 1):
                    pre_c = c
                continue
            no_repeat_blank_label.append(c)
            pre_c = c
        preb_labels.append(no_repeat_blank_label)
    for i, label in enumerate(preb_labels):
        print(label)
        if len(label) != len(test_labels[i]):
            tp_1 += 1
            continue
        if (np.asarray(test_labels[i]) == np.asarray(label)).all():
            tp += 1
        else:
            tp_2 += 1

print(tp_1, tp_2)
                
end_time = time.time()
t = end_time - start_time
acc = tp * 1.0 / (tp + tp_1 + tp_2)  # 准确率

67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
[]
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
67
6