In [3]:
# Import Google Drive and mount
from google.colab import drive
drive.mount('/content/drive')

root_path = '/content/drive/My Drive/Colab Notebooks/NLP/Assignment1'

ExpDir = '{root}/OCR-System/'.format(root=root_path)
print('Experiment Dir is:' + ExpDir)
modelDir = '{ocr}/model/'.format(ocr=ExpDir)
print('Model is in:' + modelDir)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Experiment Dir is:/content/drive/My Drive/Colab Notebooks/NLP/Assignment1/OCR-System/
Model is in:/content/drive/My Drive/Colab Notebooks/NLP/Assignment1/OCR-System//model/


In [0]:
import random
import sys
import numpy as np
import cv2
import editdistance
import tensorflow as tf

In [0]:
class FilePaths:
    """filenames and paths to data"""
    fnCharList = modelDir + 'charList.txt'
    fnAccuracy = modelDir + 'accuracy.txt'
    fnInfer = ExpDir + 'test.png'

In [0]:
class Batch:
    """batch containing images and ground truth texts"""

    def __init__(self, gtTexts, imgs):
        self.imgs = np.stack(imgs, axis=0)
        self.gtTexts = gtTexts


class DecoderType:
    BestPath = 0
    BeamSearch = 1
    WordBeamSearch = 2

In [0]:
def preprocess(img, imgSize, dataAugmentation=False):
    """put img into target img of size imgSize, transpose for TF and normalize
    gray-values"""
    # there are damaged files in IAM dataset - just use black image instead
    if img is None:
        img = np.zeros([imgSize[1], imgSize[0]])

    # increase dataset size by applying random stretches to the images
    if dataAugmentation:
        stretch = (random.random() - 0.5)  # -0.5 .. +0.5
        # random width, but at least 1
        wStretched = max(int(img.shape[1] * (1 + stretch)), 1)
        # stretch horizontally by factor 0.5 .. 1.5
        img = cv2.resize(img, (wStretched, img.shape[0]))

    # create target image and copy sample image into it
    (wt, ht) = imgSize
    (h, w) = img.shape
    fx = w / wt
    fy = h / ht
    f = max(fx, fy)
    # scale according to f (result at least 1 and at most wt or ht)
    newSize = (max(min(wt, int(w / f)), 1), max(min(ht, int(h / f)), 1))
    img = cv2.resize(img, newSize)
    target = np.ones([ht, wt]) * 255
    target[0:newSize[1], 0:newSize[0]] = img

    # transpose for TF
    img = cv2.transpose(target)

    # normalize
    (m, s) = cv2.meanStdDev(img)
    m = m[0][0]
    s = s[0][0]
    img = img - m
    img = img / s if s > 0 else img
    return img


def train(model, loader):
    "train NN"
    # number of training epochs since start
    epoch = 0
    # best valdiation character error rate
    bestCharErrorRate = float('inf')
    # number of epochs no improvement of character error rate occured
    noImprovementSince = 0
    # stop training after this number of epochs without improvement
    earlyStopping = 5
    while True:
        epoch += 1
        print('Epoch:', epoch)

        # train
        print('Train NN')
        loader.trainSet()
        while loader.hasNext():
            iterInfo = loader.getIteratorInfo()
            batch = loader.getNext()
            loss = model.trainBatch(batch)
            print('Batch:', iterInfo[0], '/', iterInfo[1], 'Loss:', loss)

        # validate
        charErrorRate = validate(model, loader)

        # if best validation accuracy so far, save model parameters
        if charErrorRate < bestCharErrorRate:
            print('Character error rate improved, save model')
            bestCharErrorRate = charErrorRate
            noImprovementSince = 0
            model.save()
            open(FilePaths.fnAccuracy, 'w').write(
                'Validation character error rate of saved model: %f%%' % (
                        charErrorRate * 100.0))
        else:
            print('Character error rate not improved')
            noImprovementSince += 1

        # stop training if no more improvement in the last x epochs
        if noImprovementSince >= earlyStopping:
            print('No more improvement since %d epochs. Training '
                  'stopped.' % earlyStopping)
            break

In [0]:
def validate(model, loader):
    "validate NN"
    print('Validate NN')
    loader.validationSet()
    numCharErr = 0
    numCharTotal = 0
    numWordOK = 0
    numWordTotal = 0
    while loader.hasNext():
        iterInfo = loader.getIteratorInfo()
        print('Batch:', iterInfo[0], '/', iterInfo[1])
        batch = loader.getNext()
        recognized = model.inferBatch(batch)

        print('Ground truth -> Recognized')
        for i in range(len(recognized)):
            numWordOK += 1 if batch.gtTexts[i] == recognized[i] else 0
            numWordTotal += 1
            dist = editdistance.eval(recognized[i], batch.gtTexts[i])
            numCharErr += dist
            numCharTotal += len(batch.gtTexts[i])
            print('[OK]' if dist == 0 else '[ERR:%d]' % dist,
                  '"' + batch.gtTexts[i] + '"', '->',
                  '"' + recognized[i] + '"')

    # print validation result
    charErrorRate = numCharErr / numCharTotal
    wordAccuracy = numWordOK / numWordTotal
    print('Character error rate: %f%%. Word accuracy: %f%%.' % (
        charErrorRate * 100.0, wordAccuracy * 100.0))
    return charErrorRate

In [0]:
class Model:
    "minimalistic TF model for HTR"

    # model constants
    batchSize = 50
    imgSize = (128, 32)
    maxTextLen = 32

    def __init__(self, charList, decoderType=DecoderType.BestPath,
                 mustRestore=False):
        "init model: add CNN, RNN and CTC and initialize TF"
        self.charList = charList
        self.decoderType = decoderType
        self.mustRestore = mustRestore
        self.snapID = 0

        # CNN
        self.inputImgs = tf.placeholder(
            tf.float32, shape=(Model.batchSize,
                               Model.imgSize[0],
                               Model.imgSize[1]))
        cnnOut4d = self.setupCNN(self.inputImgs)

        # RNN
        rnnOut3d = self.setupRNN(cnnOut4d)

        # CTC
        (self.loss, self.decoder) = self.setupCTC(rnnOut3d)

        # optimizer for NN parameters
        self.batchesTrained = 0
        self.learningRate = tf.placeholder(tf.float32, shape=[])
        self.optimizer = tf.train.RMSPropOptimizer(
            self.learningRate).minimize(self.loss)

        # initialize TF
        (self.sess, self.saver) = self.setupTF()

    def setupCNN(self, cnnIn3d):
        "create CNN layers and return output of these layers"
        cnnIn4d = tf.expand_dims(input=cnnIn3d, axis=3)

        # list of parameters for the layers
        kernelVals = [5, 5, 3, 3, 3]
        featureVals = [1, 32, 64, 128, 128, 256]
        strideVals = poolVals = [(2, 2), (2, 2), (1, 2), (1, 2), (1, 2)]
        numLayers = len(strideVals)

        # create layers
        pool = cnnIn4d  # input to first CNN layer
        for i in range(numLayers):
            kernel = tf.Variable(tf.truncated_normal(
                [kernelVals[i], kernelVals[i], featureVals[i],
                 featureVals[i + 1]], stddev=0.1))
            conv = tf.nn.conv2d(pool, kernel, padding='SAME',
                                strides=(1, 1, 1, 1))
            relu = tf.nn.relu(conv)
            pool = tf.nn.max_pool(relu,
                                  (1, poolVals[i][0], poolVals[i][1], 1),
                                  (1, strideVals[i][0], strideVals[i][1], 1),
                                  'VALID')

        return pool

    def setupRNN(self, rnnIn4d):
        "create RNN layers and return output of these layers"
        rnnIn3d = tf.squeeze(rnnIn4d, axis=[2])

        # basic cells which is used to build RNN
        numHidden = 256
        # 2 layers
        cells = [
            tf.contrib.rnn.LSTMCell(
                num_units=numHidden, state_is_tuple=True) for _ in range(2)]

        # stack basic cells
        stacked = tf.contrib.rnn.MultiRNNCell(cells, state_is_tuple=True)

        # bidirectional RNN
        # BxTxF -> BxTx2H
        ((fw, bw), _) = tf.nn.bidirectional_dynamic_rnn(
            cell_fw=stacked, cell_bw=stacked, inputs=rnnIn3d,
            dtype=rnnIn3d.dtype)

        # BxTxH + BxTxH -> BxTx2H -> BxTx1X2H
        concat = tf.expand_dims(tf.concat([fw, bw], 2), 2)

        # project output to chars (including blank):
        # BxTx1x2H -> BxTx1xC -> BxTxC
        kernel = tf.Variable(
            tf.truncated_normal(
                [1, 1, numHidden * 2, len(self.charList) + 1], stddev=0.1))
        return tf.squeeze(tf.nn.atrous_conv2d(
            value=concat, filters=kernel, rate=1, padding='SAME'), axis=[2])

    def setupCTC(self, ctcIn3d):
        "create CTC loss and decoder and return them"
        # BxTxC -> TxBxC
        ctcIn3dTBC = tf.transpose(ctcIn3d, [1, 0, 2])
        # ground truth text as sparse tensor
        self.gtTexts = tf.SparseTensor(
            tf.placeholder(tf.int64, shape=[None, 2]),
            tf.placeholder(tf.int32, [None]),
            tf.placeholder(tf.int64, [2]))
        # calc loss for batch
        self.seqLen = tf.placeholder(tf.int32, [None])
        loss = tf.nn.ctc_loss(
            labels=self.gtTexts, inputs=ctcIn3dTBC,
            sequence_length=self.seqLen, ctc_merge_repeated=True)
        # decoder: either best path decoding or beam search decoding
        if self.decoderType == DecoderType.BestPath:
            decoder = tf.nn.ctc_greedy_decoder(inputs=ctcIn3dTBC,
                                               sequence_length=self.seqLen)
        elif self.decoderType == DecoderType.BeamSearch:
            decoder = tf.nn.ctc_beam_search_decoder(
                inputs=ctcIn3dTBC, sequence_length=self.seqLen, beam_width=50,
                merge_repeated=False)
        elif self.decoderType == DecoderType.WordBeamSearch:
            # import compiled word beam search operation
            # (see https://github.com/githubharald/CTCWordBeamSearch)
            word_beam_search_module = tf.load_op_library('TFWordBeamSearch.so')

            # prepare information about language (dictionary, characters in
            # dataset, characters forming words)
            chars = str().join(self.charList)
            wordChars = open(
                modelDir + 'wordCharList.txt').read().splitlines()[0]
            corpus = open(modelDir + 'corpus.txt').read()

            # decode using the "Words" mode of word beam search
            decoder = word_beam_search_module.word_beam_search(
                tf.nn.softmax(ctcIn3dTBC, dim=2), 50, 'Words', 0.0,
                corpus.encode('utf8'), chars.encode('utf8'),
                wordChars.encode('utf8'))

        # return a CTC operation to compute the loss and a CTC operation to
        # decode the RNN output
        return (tf.reduce_mean(loss), decoder)

    def setupTF(self):
        "initialize TF"
        print('Python: ' + sys.version)
        print('Tensorflow: ' + tf.__version__)

        # TF session
        sess = tf.Session()
        # saver saves model to file
        saver = tf.train.Saver(max_to_keep=1)
        # is there a saved model?
        latestSnapshot = tf.train.latest_checkpoint(modelDir)

        # if model must be restored (for inference), there must be a snapshot
        if self.mustRestore and not latestSnapshot:
            raise Exception('No saved model found in: ' + modelDir)

        # load saved model if available
        if latestSnapshot:
            print('Init with stored values from ' + latestSnapshot)
            saver.restore(sess, latestSnapshot)
        else:
            print('Init with new values')
            sess.run(tf.global_variables_initializer())

        return (sess, saver)

    def toSparse(self, texts):
        "put ground truth texts into sparse tensor for ctc_loss"
        indices = []
        values = []
        shape = [len(texts), 0]  # last entry must be max(labelList[i])

        # go over all texts
        for (batchElement, text) in enumerate(texts):
            # convert to string of label (i.e. class-ids)
            labelStr = [self.charList.index(c) for c in text]
            # sparse tensor must have size of max. label-string
            if len(labelStr) > shape[1]:
                shape[1] = len(labelStr)
            # put each label into sparse tensor
            for (i, label) in enumerate(labelStr):
                indices.append([batchElement, i])
                values.append(label)

        return (indices, values, shape)

    def decoderOutputToText(self, ctcOutput):
        "extract texts from output of CTC decoder"

        # contains string of labels for each batch element
        encodedLabelStrs = [[] for i in range(Model.batchSize)]

        # word beam search: label strings terminated by blank
        if self.decoderType == DecoderType.WordBeamSearch:
            blank = len(self.charList)
            for b in range(Model.batchSize):
                for label in ctcOutput[b]:
                    if label == blank:
                        break
                    encodedLabelStrs[b].append(label)

        # TF decoders: label strings are contained in sparse tensor
        else:
            # ctc returns tuple, first element is SparseTensor
            decoded = ctcOutput[0][0]

            # go over all indices and save mapping: batch -> values
            # idxDict = {b: [] for b in range(Model.batchSize)}
            for (idx, idx2d) in enumerate(decoded.indices):
                label = decoded.values[idx]
                # index according to [b,t]
                batchElement = idx2d[0]
                encodedLabelStrs[batchElement].append(label)

        # map labels to chars for all batch elements
        return [
            str().join([self.charList[c] for c in labelStr]) for
            labelStr in encodedLabelStrs]

    def trainBatch(self, batch):
        "feed a batch into the NN to train it"
        sparse = self.toSparse(batch.gtTexts)
        # decay learning rate
        rate = 0.01 if self.batchesTrained < 10 else (
            0.001 if self.batchesTrained < 10000 else 0.0001)
        (_, lossVal) = self.sess.run(
            [self.optimizer, self.loss],
            {self.inputImgs: batch.imgs,
             self.gtTexts: sparse,
             self.seqLen: [Model.maxTextLen] * Model.batchSize,
             self.learningRate: rate})
        self.batchesTrained += 1
        return lossVal

    def inferBatch(self, batch):
        "feed a batch into the NN to recngnize the texts"
        decoded = self.sess.run(
            self.decoder,
            {self.inputImgs: batch.imgs,
             self.seqLen: [Model.maxTextLen] * Model.batchSize})
        return self.decoderOutputToText(decoded)

    def save(self):
        "save model to file"
        self.snapID += 1
        self.saver.save(self.sess, 'model/snapshot', global_step=self.snapID)

In [0]:
def infer(model, fnImg):
    "recognize text in image provided by file path"
    img = preprocess(cv2.imread(fnImg, cv2.IMREAD_GRAYSCALE), Model.imgSize)
    # fill all batch elements with same input image
    batch = Batch(None, [img] * Model.batchSize)
    # recognize text
    recognized = model.inferBatch(batch)
    # all batch elements hold same result
    print('Recognized:', '"' + recognized[0] + '"')

In [0]:
def main():
    "main function"

    decoderType = DecoderType.BestPath

    print(open(FilePaths.fnAccuracy).read())
    model = Model(open(FilePaths.fnCharList).read(),
                  decoderType, mustRestore=True)
    infer(model, FilePaths.fnInfer)

In [12]:
if __name__ == '__main__':
    main()

Validation character error rate of saved model: 13.956289%
The TensorFlow contrib module will not be included in TensorFlow 2.0.
For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
  * https://github.com/tensorflow/io (for I/O related ops)
If you depend on functionality not listed there, please file an issue.

Instructions for updating:
This class is equivalent as tf.keras.layers.LSTMCell, and will be replaced by that in Tensorflow 2.0.
Instructions for updating:
This class is equivalent as tf.keras.layers.StackedRNNCells, and will be replaced by that in Tensorflow 2.0.
Instructions for updating:
Please use `keras.layers.Bidirectional(keras.layers.RNN(cell))`, which is equivalent to this API
Instructions for updating:
Please use `keras.layers.RNN(cell)`, which is equivalent to this API
Instructions for updating:
Please use `layer.add_weight` method instead.
Instructions for up