In [1]:
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
tf CNN+LSTM+CTC 训练识别不定长数字字符图片

@author: ygwu
"""
from genIDCard  import *
import numpy as np
import time 
import os
os.environ['TF_CPP_MIN_LOG_LEVEL']='2'
import tensorflow as tf



class FError(Exception):
    pass

#定义一些常量
#图片大小，32 x 256
OUTPUT_SHAPE = (32,256)
#训练最大轮次
num_epochs = 10000
#LSTM的cell的h输出
num_hidden = 64
#一层LSTM
num_layers = 1
#获取了身份证号对象
obj = gen_id_card()
#识别对象集合
num_classes = obj.len + 1 + 1  # 10位数字 + blank + ctc blank

#初始化学习速率
INITIAL_LEARNING_RATE = 1e-3
#每5000步衰减一次学习率
DECAY_STEPS = 5000
#100步汇报一次
REPORT_STEPS = 100
#学习率衰减系数0.9
LEARNING_RATE_DECAY_FACTOR = 0.9  # The learning rate decay factor
MOMENTUM = 0.9

DIGITS='0123456789'
BATCHES = 10
BATCH_SIZE = 64
TRAIN_SIZE = BATCHES * BATCH_SIZE
#转化一个稀疏矩阵为序列列表
def decode_sparse_tensor(sparse_tensor):
    #print("sparse_tensor = ", sparse_tensor)
    decoded_indexes = list()
    current_i = 0
    current_seq = []
    for offset, i_and_index in enumerate(sparse_tensor[0]):
        i = i_and_index[0]
        if i != current_i:
            decoded_indexes.append(current_seq)
            current_i = i
            current_seq = list()
        current_seq.append(offset)
    decoded_indexes.append(current_seq)
    #print("decoded_indexes = ", decoded_indexes)
    #准备输出结果
    result = []
    for index in decoded_indexes:
        #print("index = ", index)
        result.append(decode_a_seq(index, sparse_tensor))
        #print(result)
    return result
# 
def decode_a_seq(indexes, spars_tensor):
    decoded = []
    for m in indexes:
        str = DIGITS[spars_tensor[1][m]]
        decoded.append(str)
    # Replacing blank label to none
    #str_decoded = str_decoded.replace(chr(ord('9') + 1), '')
    # Replacing space label to space
    #str_decoded = str_decoded.replace(chr(ord('0') - 1), ' ')
    # print("ffffffff", str_decoded)
    return decoded
#汇报准确率
def report_accuracy(decoded_list, test_targets):
    original_list = decode_sparse_tensor(test_targets)
    detected_list = decode_sparse_tensor(decoded_list)
    true_numer = 0
    if len(original_list) != len(detected_list):
        print("len(original_list)", len(original_list), "len(detected_list)", len(detected_list),
              " test and detect length desn't match")
        return -1
    print("T/F: original(length) <-------> detectcted(length)")
    for idx, number in enumerate(original_list):
        detect_number = detected_list[idx]
        hit = (number == detect_number)
        print(hit, number, "(", len(number), ") <-------> ", detect_number, "(", len(detect_number), ")")
        if hit:
            true_numer = true_numer + 1
    Accuracy = true_numer * 1.0 / len(original_list)
    print("Test Accuracy:", Accuracy)
    return Accuracy
#转化一个序列列表为稀疏矩阵，这个用的多    
def sparse_tuple_from(sequences, dtype=np.int32):
    """
    Create a sparse representention of x.
    Args:
        sequences: a list of lists of type dtype where each element is a sequence
    Returns:
        A tuple with (indices, values, shape)
    """
    indices = []
    values = []
    
    for n, seq in enumerate(sequences):
        # zip打包为元组
        indices.extend(zip([n] * len(seq), range(len(seq))))
        values.extend(seq)
    # indices:二维int64的矩阵，代表非0的坐标点
    indices = np.asarray(indices, dtype=np.int64)
    # values:二维tensor，代表indice位置的数据值
    values = np.asarray(values, dtype=dtype)
    #dense_shape:一维，代表稀疏矩阵的大小
    shape = np.asarray([len(sequences), np.asarray(indices).max(0)[1] + 1], dtype=np.int64)
    
    return indices, values, shape
    
# 简化了参数配置和函数名
def weight_variable(shape):
    initial = tf.truncated_normal(shape, stddev=0.5)# 标准差
    return tf.Variable(initial) 
 
def bias_variable(shape):
    initial = tf.constant(0.1, shape=shape)
    return tf.Variable(initial)
 
def conv2d(x, W, stride=(1, 1), padding='SAME'):
    return tf.nn.conv2d(x, W, strides=[1, stride[0], stride[1], 1],padding=padding) 
 
def max_pool(x, ksize=(2, 2), stride=(2, 2)):
    return tf.nn.max_pool(x, ksize=[1, ksize[0], ksize[1], 1],strides=[1, stride[0], stride[1], 1], padding='SAME')
 
def avg_pool(x, ksize=(2, 2), stride=(2, 2)):
    return tf.nn.avg_pool(x, ksize=[1, ksize[0], ksize[1], 1],strides=[1, stride[0], stride[1], 1], padding='SAME')

# 生成一个图像
def get_a_image():
    obj = gen_id_card()
    #(batch_size,256,32)
    inputs = np.zeros([1, OUTPUT_SHAPE[1],OUTPUT_SHAPE[0]]) # 1*256*32
    codes = []

    #生成不定长度的字串
    #image, text, vec = obj.gen_image(True)
    image, text, vec = obj.gen_image()
    #np.transpose 矩阵转置 (32*256,) => (32,256) => (256,32)
    inputs[0,:] = np.transpose(image.reshape((OUTPUT_SHAPE[0],OUTPUT_SHAPE[1])))
    codes.append(list(text))
    # 矩阵
    targets = [np.asarray(i) for i in codes]
    # 稀疏矩阵
    sparse_targets = sparse_tuple_from(targets)
    # 验证码长度
    seq_len = np.ones(inputs.shape[0]) * OUTPUT_SHAPE[1]
    return inputs, sparse_targets, seq_len, image
 
# 生成一个训练batch
def get_next_batch(batch_size=128):
    obj = gen_id_card()
    #(batch_size,256,32)
    inputs = np.zeros([batch_size, OUTPUT_SHAPE[1],OUTPUT_SHAPE[0]]) #128*256*32
    codes = []

    for i in range(batch_size):
        #生成不定长度的字串
        #image, text, vec = obj.gen_image(True)
        #根据生成的text，生成image,返回标签和图片元素数据
        image, text, vec = obj.gen_image()
        #np.transpose 矩阵转置 (32*256,) => (32,256) => (256,32)
        #image是32*256*3
        inputs[i,:] = np.transpose(image.reshape((OUTPUT_SHAPE[0],OUTPUT_SHAPE[1]))) #128*256*32
        #标签text
        codes.append(list(text))#128*
    #
    targets = [np.asarray(i) for i in codes]
    #print(targets) 
    #转为稀疏矩阵
    sparse_targets = sparse_tuple_from(targets)
    #(batch_size,) 值都是256
    # 验证码长度
    seq_len = np.ones(inputs.shape[0]) * OUTPUT_SHAPE[1]
    #图片，标签，长度
    return inputs, sparse_targets, seq_len
    


def get_train_model():
    #features = convolutional_layers()
    #print features.get_shape()
    # (?,?,32)
    inputs = tf.placeholder(tf.float32, [None, None, OUTPUT_SHAPE[0]])
    
    #定义ctc_loss需要的稀疏矩阵
    targets = tf.sparse_placeholder(tf.int32)
    
    #1维向量 序列长度 [batch_size,]
    seq_len = tf.placeholder(tf.int32, [None])
    
    #定义LSTM网络
    cell = tf.contrib.rnn.LSTMCell(num_hidden, state_is_tuple=True)
    stack = tf.contrib.rnn.MultiRNNCell([cell] * num_layers, state_is_tuple=True)
    outputs, _ = tf.nn.dynamic_rnn(cell, inputs, seq_len, dtype=tf.float32)
    
    shape = tf.shape(inputs)
    batch_s, max_timesteps = shape[0], shape[1]#128，256
    
    outputs = tf.reshape(outputs, [-1, num_hidden])#128
    W = tf.Variable(tf.truncated_normal([num_hidden,#128 12
                                          num_classes],
                                         stddev=0.1), name="W")
    b = tf.Variable(tf.constant(0., shape=[num_classes]), name="b")#12
    # 输出×权重+偏置
    logits = tf.matmul(outputs, W) + b
    # 128 256 12
    logits = tf.reshape(logits, [batch_s, -1, num_classes])
    # 256 128 12
    logits = tf.transpose(logits, (1, 0, 2))
    # 
    return logits, inputs, targets, seq_len, W, b

def crack_image(test_inputs,test_targets,test_seq_len): 
    global_step = tf.Variable(0, trainable=False)
    learning_rate = tf.train.exponential_decay(INITIAL_LEARNING_RATE,
                                                global_step,
                                                DECAY_STEPS,
                                                LEARNING_RATE_DECAY_FACTOR,
                                                staircase=True)
    logits, inputs, targets, seq_len, W, b = get_train_model()
    
    decoded, log_prob = tf.nn.ctc_beam_search_decoder(logits, seq_len, merge_repeated=False)
    
    acc = tf.reduce_mean(tf.edit_distance(tf.cast(decoded[0], tf.int32), targets))
    
    init = tf.global_variables_initializer()
    saver = tf.train.Saver()
    with tf.Session() as session:
       saver.restore(session, "./ocr.model-1200")
       #test_inputs,test_targets,test_seq_len = get_next_batch(1)
       #test_inputs,test_targets,test_seq_len,image = get_a_image()
       test_feed = {inputs: test_inputs,
                    targets: test_targets,
                    seq_len: test_seq_len}
       dd, log_probs, accuracy = session.run([decoded[0], log_prob, acc], test_feed)
       report_accuracy(dd, test_targets)
       #cv2.imshow('image', image)
       #cv2.waitKey(0)

  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])
  _np_qint8 = np.dtype([("qint8", np.int8, 1)])
  _np_quint8 = np.dtype([("quint8", np.uint8, 1)])
  _np_qint16 = np.dtype([("qint16", np.int16, 1)])
  _np_quint16 = np.dtype([("quint16", np.uint16, 1)])
  _np_qint32 = np.dtype([("qint32", np.int32, 1)])
  np_resource = np.dtype([("resource", np.ubyte, 1)])


In [2]:
inputs, sparse_targets,seq_len = get_next_batch(1)
print(inputs)
print(decode_sparse_tensor(sparse_targets))

[[[0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  ...
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]
  [0. 0. 0. ... 0. 0. 0.]]]
[['6', '3', '9', '2', '5', '5', '5', '1', '0', '4', '8', '0', '5', '0', '0']]


In [3]:
crack_image(inputs, sparse_targets,seq_len)

The TensorFlow contrib module will not be included in TensorFlow 2.0.
For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
  * https://github.com/tensorflow/io (for I/O related ops)
If you depend on functionality not listed there, please file an issue.

Instructions for updating:
This class is equivalent as tf.keras.layers.LSTMCell, and will be replaced by that in Tensorflow 2.0.
Instructions for updating:
This class is equivalent as tf.keras.layers.StackedRNNCells, and will be replaced by that in Tensorflow 2.0.
Instructions for updating:
Please use `keras.layers.RNN(cell)`, which is equivalent to this API
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
Instructions for updating:
Call initializer instance with the dtype argument instead of passing it to the constructor
Instructions for updating:
Use tf.where