<a href="https://colab.research.google.com/github/DavidRubio24/SimpleHTR/blob/master/SimpleHTR_Model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Este codigo está sacado de https://github.com/githubharald/SimpleHTR

En concreto este código es Model.py

Se le han añadido comentarios por todos lados para aclarar su funcionamiento.

Los imports:

*  __future__ trae caracteristicas de versiones futuras de python, así con python 2 este código también funciona.
*  NumPy se usa para tratar vectores y matrices eficientemente.
*  TensorFlow se usa para redes neuronales.

In [0]:
from __future__ import division
from __future__ import print_function

import numpy as np
import tensorflow as tf

Definimos un enumerado para identificar el algoritmo con el que e¡decidiremos cual es la palabras más razonable que cuadra.

In [0]:
class DecoderType:
  BestPath = 0
  BeamSearch = 1
  WordBeamSearch = 2

La clase Model define una funcion parametrizable que hace predicciones. Entrenar el modelo consiste en elegir los paramaetros adecuados.

Primero definimos unas constantes:

*  Cantidad de ejemplos que usareos a la vez para entrenar el modelo.
*  Tamaño de la imagen que aceptaremos. Se puede rellenar con blanco si la proporción no cuadra.
*  Cantidad máxima de acracteres que nuestro modelo puede predecir que tiene la imágen.

In [0]:
class Model:
  "minimalistic TensorFlow model for Handwitten Text Recognition"

  # model constants
  batchSize = 50
  imgSize = (128, 32)
  maxTextLen = 32

(Para definir las funciones por separado en este Notebook pondremos *class Model(Model):* al principio de cada celda, es un truco un poco sucio, pero no hay otra forma mejor: https://github.com/jupyter/notebook/issues/1243) 

Se inicializa el modelo.

En todo este documento no se realiza ninguna operación. Solo se crea un grafo de computación que se ejecutará desde main.py.

In [0]:
class Model(Model):
  def __init__(self, charList, decoderType=DecoderType.BestPath, mustRestore=False):
    """ init model: add CNN, RNN and CTC and initialize TensorFlow

        Hay que especificar los caracteres que queremos reconocer,
        el tipo de busqueda en diccionario, si queremos seguir
        entrenando (mustRestore=True) o empezar de cero y ¿snapID?
    """
    self.charList = charList
    self.decoderType = decoderType
    self.mustRestore = mustRestore
    self.snapID = 0

    # Whether to use normalization over a batch or a population
    self.is_train = tf.placeholder(tf.bool, name='is_train') # Los placeholders son datos que se proporcionarán más tarde, al hacer session.run(, feed_dict=)

    # input image batch
    self.inputImgs = tf.placeholder(tf.float32, shape=(None, Model.imgSize[0], Model.imgSize[1]))

    # setup CNN, RNN and CTC
    # Estas lineas son las importantes. Las funciones están explcadas más abajo.
    self.setupCNN()
    self.setupRNN()
    self.setupCTC()

    # setup optimizer to train NN
    self.batchesTrained = 0
    self.learningRate   = tf.placeholder(tf.float32, shape=[])
    
    # Las siguientes 2 lineas son unas dependencias que hay que procesar antes del algortmo de optimización
    self.update_ops     = tf.get_collection(tf.GraphKeys.UPDATE_OPS) 
    with tf.control_dependencies(self.update_ops):
      # Se optimiza loss con el algoritmo RMSPropOptimizer y ratio de aprendizaje self.learningRate
      # Esta optimización irá cambiando los parametros del modelo para reducir loss
      self.optimizer = tf.train.RMSPropOptimizer(self.learningRate).minimize(self.loss)

    # initialize TF
    (self.sess, self.saver) = self.setupTF()

Las redes neuronales convolucionales (CNN) se usan para visión artificial.

A partir de la imágen original extraen ciertar caracteristicas pasandoles filtros (kernels) de distintos tamaños (tipocamente 3x3, 5x5 o incluso 7x7).
Cada filtro genera un canal de una imagen nueva en la que se destacan las caracteristicas que buscaba ese filtro.

In [0]:
class Model(Model):
  def setupCNN(self):
    "create CNN layers and return output of these layers"
    # La imagen tiene 2 dimensiones, pero tenemos que añadirle una más porque vamos a trabajar con varios canales
    cnnIn4d = tf.expand_dims(input=self.inputImgs, axis=3)

    # list of parameters for the layers
    kernelVals = [5, 5, 3, 3, 3]              # Tamaño de los filtros (nº de pixels que tiene el lado)
    featureVals = [1, 32, 64, 128, 128, 256]  # Nº de canales que leen los filtros de la i-esima layer
    strideVals = poolVals = [(2,2), (2,2), (1,2), (1,2), (1,2)]  #Cada cuantos pixels se aplica el filtro
    numLayers = len(strideVals)  # Nº de capas de filtros

    # Create layers
    pool = cnnIn4d # input to first CNN layer
    
    for i in range(numLayers):
    # Por cada layer:
      # Generamos aleatoriamente featureVals[i + 1] filtros de kernelVals[i]xkernelVals[i] que toman una imágen de featureVals[i] canales.
      random_values_for_kernel = tf.truncated_normal([kernelVals[i], kernelVals[i], featureVals[i], featureVals[i + 1]], stddev=0.1)
      
      # Creamos una Variable (las Variables son los parametros del modelo)
      kernel = tf.Variable(random_values_for_kernel)
      
      # Metemos la entrada pool a una layer convolucional con los filtros (kernel) generados
      # Para que el tamaño de la imagen siga siendo el mismo ('SAME') se añaden pixels por los bordes (padding)
      # Los filtros los aplican de pixel en pixel en cada dimensión (no termino de entender la 4ª dim., debe ser la de los filtros pero no veo como cuadra)
      conv = tf.nn.conv2d(pool, kernel, padding='SAME',  strides=(1,1,1,1))
      
      # Despues de la convlución se normalizan los datos
      conv_norm = tf.layers.batch_normalization(conv, training=self.is_train)
      
      # Se le aplica una no linearidad a los resultados
      relu = tf.nn.relu(conv_norm)
      
      # Se reduce la imagen quedandonos con el mayor pixel de cada rectangulo de tamaño poolVals[i]
      pool = tf.nn.max_pool(relu, (1, poolVals[i][0], poolVals[i][1], 1), (1, strideVals[i][0], strideVals[i][1], 1), 'VALID')

    # Salida de la CNN
    self.cnnOut4d = pool

Las Redes Neuronares Recurrentes (RNN) son capaces de entender contexto.

Las más usadas son las Long-Short Term Memory que tienen en cuenta tanto el contexto en general como el contexto muy reciente. Aquí usaremos una bidireccional, para que entienda tanto el contexto de lo que ha pasado como el contexto de lo que está por venir.

In [0]:
class Model(Model):
  def setupRNN(self):
    "create RNN layers and return output of these layers"
    
    # Se elimina la dimensión 2 del tensor, que solo tiene tamaño uno
    rnnIn3d = tf.squeeze(self.cnnOut4d, axis=[2])

    # basic cells which is used to build RNN
    # TODO: estudiar si las CUDNNLSTM se pueden usar aquí para mejorar eficiencia
    numHidden = 256
    cells = [tf.contrib.rnn.LSTMCell(num_units=numHidden, state_is_tuple=True) for _ in range(2)] # 2 layers

    # stack basic cells
    stacked = tf.contrib.rnn.MultiRNNCell(cells, state_is_tuple=True)

    # Creamos una LSTM bidireccional cuya entrada es de tamaño (nº de ejemplos por batch)x(Tiempo)x(nº de caracteristicas)
    # BxTxF -> BxTx2H
    ((fw, bw), _) = tf.nn.bidirectional_dynamic_rnn(cell_fw=stacked, cell_bw=stacked, inputs=rnnIn3d, dtype=rnnIn3d.dtype)
    
    # Concatenamos las salidas de la BLSTM y añadimos una dimension (why?)
    # BxTxH + BxTxH -> BxTx2H -> BxTx1X2H
    concat = tf.expand_dims(tf.concat([fw, bw], 2), 2)
                  
    # Pasamos un filtro que a cada instante de tiempo (parte de la imagen) le asigna una letra o un carater especial '-'
    # project output to chars (including blank): BxTx1x2H -> BxTx1xC -> BxTxC
    kernel = tf.Variable(tf.truncated_normal([1, 1, numHidden * 2, len(self.charList) + 1], stddev=0.1))
    self.rnnOut3d = tf.squeeze(tf.nn.atrous_conv2d(value=concat, filters=kernel, rate=1, padding='SAME'), axis=[2])

Varias partes de la imagen se convertirán en la misma letra, los espacios entre las letras se convertirán en el caracter especial '-'.

Para hacer cuadrar esta salida con palabras se usa una Connectionist Temporal Classification que decide que palabras tienen más probabilidades.

Esta red está especialmente pensada para poder entrenar la red.

In [0]:
class Model(Model):
  def setupCTC(self):
    "create CTC loss and decoder and return them"
    # BxTxC -> TxBxC
    self.ctcIn3dTBC = tf.transpose(self.rnnOut3d, [1, 0, 2])
    
    # ground truth text as sparse tensor
    # Texto real etiquetado (bueno, un placeholder para ello)
    self.gtTexts = tf.SparseTensor(tf.placeholder(tf.int64, shape=[None, 2]) , tf.placeholder(tf.int32, [None]), tf.placeholder(tf.int64, [2]))

    # calc loss for batch
    self.seqLen = tf.placeholder(tf.int32, [None])
    # Se calcula el error medio cometido en la predicción
    # Esta loss es la que se optimiza en __init__
    self.loss = tf.reduce_mean(tf.nn.ctc_loss(labels=self.gtTexts, inputs=self.ctcIn3dTBC, sequence_length=self.seqLen, ctc_merge_repeated=True))

    # calc loss for each element to compute label probability
    self.savedCtcInput  = tf.placeholder(tf.float32, shape=[Model.maxTextLen, None, len(self.charList) + 1]) # Esto es basicamente self.ctcIn3dTBC
    self.lossPerElement = tf.nn.ctc_loss(labels=self.gtTexts, inputs=self.savedCtcInput, sequence_length=self.seqLen, ctc_merge_repeated=True)

    # Busca la palabra más razonalble basandos en la predicción hecha hasta ahora
    # decoder: either best path decoding or beam search decoding
    if self.decoderType == DecoderType.BestPath:
      self.decoder = tf.nn.ctc_greedy_decoder(inputs=self.ctcIn3dTBC, sequence_length=self.seqLen)
    elif self.decoderType == DecoderType.BeamSearch:
      self.decoder = tf.nn.ctc_beam_search_decoder(inputs=self.ctcIn3dTBC, sequence_length=self.seqLen, beam_width=50, merge_repeated=False)
    elif self.decoderType == DecoderType.WordBeamSearch:
      # import compiled word beam search operation (see https://github.com/githubharald/CTCWordBeamSearch)
      word_beam_search_module = tf.load_op_library('TFWordBeamSearch.so')

      # prepare information about language (dictionary, characters in dataset, characters forming words) 
      chars = str().join(self.charList)
      wordChars = open('../model/wordCharList.txt').read().splitlines()[0] # Documento con todas las letras
      corpus = open('../data/corpus.txt').read()  # En este documento hay mucho texto en inglés.

      # decode using the "Words" mode of word beam search
      self.decoder = word_beam_search_module.word_beam_search(tf.nn.softmax(self.ctcIn3dTBC, dim=2), 50, 'Words', 0.0, corpus.encode('utf8'), chars.encode('utf8'), wordChars.encode('utf8'))


In [0]:
class Model(Model):
  def setupTF(self):
    """
    initialize TF
    
    Crea una sesion de TensorFlow e inicializa los parámetros (aleatoriamente o partir de un fichero).
    Devuelve la sesion y un saver para guardar los parámetros.
    """
    
    print('Tensorflow: ' + tf.__version__)

    # Crea una sesion TensorFlow
    # Las sesiones se necesitan para cmputar los grafos de computación generados
    sess = tf.Session() # TF session

    # Para guardar los parametros del modelo necesita un Saver
    saver = tf.train.Saver(max_to_keep=1) # saver saves model to file
    modelDir = '../model/'
    latestSnapshot = tf.train.latest_checkpoint(modelDir) # is there a saved model?

    # if model must be restored (for inference), there must be a snapshot
    if self.mustRestore and not latestSnapshot:
      raise Exception('No saved model found in: ' + modelDir)

    # load saved model if available
    if latestSnapshot:
      print('Init with stored values from ' + latestSnapshot)
      saver.restore(sess, latestSnapshot)
    else:
      print('Init with new values')
      sess.run(tf.global_variables_initializer())

    return (sess, saver)

In [0]:
class Model(Model):
  def toSparse(self, texts):
    "put ground truth texts into sparse tensor for ctc_loss"
    # Los sparse tensors solo guardan unos pocos valores del tensor con sus indices y asumen que el resto son  0s
    # La CTC solo admite labels en este formato (las CTC dejan bastante que desear en TF/Keras)
    # La conversión a tf.sparse.SparseTensor se hace en setupCTC (no me parece buena idea, no se porque no se puede hacer aquí)
    indices = []
    values  = []
    shape   = [len(texts), 0] # last entry must be max(labelList[i])

    # go over all texts
    for (batchElement, text) in enumerate(texts):
      # convert to string of label (i.e. class-ids)
      labelStr = [self.charList.index(c) for c in text]
      
      # sparse tensor must have size of max. label-string
      # Calculamos el tamaño máximo de las labelStr y lo guardamos en shape
      if len(labelStr) > shape[1]:
        shape[1] = len(labelStr)

      # put each label into sparse tensor
      for (i, label) in enumerate(labelStr):
        indices.append([batchElement, i])
        values.append(label)

    return (indices, values, shape)

In [0]:
class Model(Model):
  def decoderOutputToText(self, ctcOutput, batchSize):
    "extract texts from output of CTC decoder"
    
    # contains string of labels for each batch element
    encodedLabelStrs = [[] for i in range(batchSize)]

    # word beam search: label strings terminated by blank
    if self.decoderType == DecoderType.WordBeamSearch: 
      # Se eliminan los blanks detectados
      blank = len(self.charList)
      for batch in range(batchSize):
        for label in ctcOutput[batch]:
          if label == blank:
            break
          encodedLabelStrs[batch].append(label)

    # TF decoders: label strings are contained in sparse tensor
    else:
      # ctc returns tuple, first element is SparseTensor 
      decoded = ctcOutput[0][0]

      # go over all indices and save mapping: batch -> values
      idxDict = { batch : [] for batch in range(batchSize) } #TODO: remove
      for (idx, idx2d) in enumerate(decoded.indices):
        label = decoded.values[idx]
        batchElement = idx2d[0] # index according to [b,t]
        encodedLabelStrs[batchElement].append(label)

    # map labels to chars for all batch elements
    return [str().join([self.charList[c] for c in labelStr]) for labelStr in encodedLabelStrs]

In [0]:
class Model(Model):
  def trainBatch(self, batch):
    "feed a batch into the NN to train it"
    # Nº de elementos por batch
    numBatchElements = len(batch.imgs)
    
    # Etiquetas en formato sparse (todavia no convertido a SparseTensor)
    sparseGT = self.toSparse(batch.gtTexts)
    
    # Ratio de aprendizaje. Decrece conforme entrenamos el modelo.
    rate = 0.01 if self.batchesTrained < 10 else (0.001 if self.batchesTrained < 10000 else 0.0001) # decay learning rate
    
    # Diccionario indicando el valor de todos los placeholders que necesitamos para ejecutar self.optimizer y self.loss
    feedDict = {self.inputImgs : batch.imgs,                         # Imágenes de ejemplo para entrenar
                self.gtTexts : sparseGT,                             # Etiquetas de los ejemplos
                self.seqLen : [Model.maxTextLen] * numBatchElements, # Tamaño de las predicciones
                self.learningRate : rate,                            # Ratio de aprendizaje
                self.is_train: True}                                 # Si estamos entrenando o no
    
    # Se ejecuta (aquí sí, ya no estamos creando el grafo de computación) la optimización y la precisión de la predicción
    ( _ , lossVal) = self.sess.run([self.optimizer, self.loss], feedDict)
    
    self.batchesTrained += 1
    
    return lossVal

In [0]:
class Model(Model):
  def inferBatch(self, batch, calcProbability=False, probabilityOfGT=False):
    "feed a batch into the NN to recognize the texts"

    # decode, optionally save RNN output
    # Nº de elementos por batch
    numBatchElements = len(batch.imgs)

    # Diccionario indicando el valor de todos los placeholders que necesitamos para ejecutar self.decoder y self.ctcIn3dTBC
    feedDict = {self.inputImgs : batch.imgs,
                self.seqLen : [Model.maxTextLen] * numBatchElements,
                self.is_train: False}

    # Ejecutamos los grafos de computación
    evalRes = self.sess.run([self.decoder, self.ctcIn3dTBC], feedDict) # TODO: self.ctcIn3dTBC solo necesita ejecutarse si calcProbability

    # Extraemos el texto de los resultados
    texts = self.decoderOutputToText(evalRes[0], numBatchElements)

    # feed RNN output and recognized text into CTC loss to compute labeling probability
    probs = None
    if calcProbability:
      sparse = self.toSparse(batch.gtTexts) if probabilityOfGT else self.toSparse(texts)
      ctcInput = evalRes[1]

      # Diccionario indicando el valor de todos los placeholders que necesitamos para ejecutar self.lossPerElement
      feedDict = {self.savedCtcInput : ctcInput,
                  self.gtTexts : sparse,
                  self.seqLen : [Model.maxTextLen] * numBatchElements,
                  self.is_train: False}

      lossVals = self.sess.run(self.lossPerElement, feedDict)

      probs = np.exp(-lossVals)

    return (texts, probs)

In [0]:
class Model(Model):
  def save(self):
    "save model to file"
    self.snapID += 1
    self.saver.save(self.sess, '../model/snapshot', global_step=self.snapID)