# Import

In [1]:
!pip install flask-ngrok

Collecting flask-ngrok
  Downloading https://files.pythonhosted.org/packages/af/6c/f54cb686ad1129e27d125d182f90f52b32f284e6c8df58c1bae54fa1adbc/flask_ngrok-0.0.25-py3-none-any.whl
Installing collected packages: flask-ngrok
Successfully installed flask-ngrok-0.0.25


In [15]:
import glob
import os
import os.path as osp
import random
import numpy as np
import json
from PIL import Image
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as data
import torchvision
from torchvision import models, transforms
from random import *
import pandas as pd


def set_seed(seed=123):
    random.seed(seed)
    torch.manual_seed(seed)
    np.random.seed(seed)

#Utils

In [3]:
class ImageTransform:
    def __init__(self, resize):
        self.data_transform = transforms.Compose(
            [
                transforms.Resize(resize),
                transforms.Grayscale(),
                transforms.ToTensor(),
                transforms.Normalize(mean, std)
            ]
        )

    def __call__(self, img, phase="train"):
        return self.data_transform(img)


def make_lfile(rootpath):
  with open(rootpath +"/labels.json", encoding="utf-8") as json_file:
    data = json.load(json_file)
    flist = data.keys()
  return  list(flist)

# list_image, list_label = make_datapath_list(phase="test_data")
# print(list_image)


class MyDataset(data.Dataset):
    def __init__(self, root,flist, transform=None):
        self.flist = flist
        self.root = root
        self.transform = transform
        with open(root + "/" + "labels.json",encoding='utf-8') as json_file:
          data = json.load(json_file)
        self.dict_label = data

    def __len__(self):
        return len(self.flist)

    def __getitem__(self, idx):
        name_image = self.flist[idx]
        # print(name_image)
        # Nếu chuyển ảnh về gray
        # img = Image.open(img_path).convert("1")
        img_path = osp.join(self.root, name_image)
        # print(img_path)
        img = Image.open(img_path).convert("1")
        img_transformed = self.transform(img)

        label = self.dict_label[name_image]
        return img_transformed, label



In [4]:
import collections
class strLabelConverter(object):
    """Convert between str and label.
    NOTE:
        Insert `blank` to the alphabet for CTC.
    Args:
        alphabet (str): set of the possible characters.
        ignore_case (bool, default=True): whether or not to ignore all of the case.
    """

    def __init__(self, alphabet, ignore_case=True):
        self._ignore_case = ignore_case
        if self._ignore_case:
            alphabet = alphabet.lower()
        # self.alphabet = alphabet + ' '  # for `-1` index
        self.alphabet = alphabet + '-'  # for `-1` index

        self.dict = {}
        for i, char in enumerate(alphabet):
            # NOTE: 0 is reserved for 'blank' required by wrap_ctc
            self.dict[char] = i + 1

    def encode(self, text):
        """Support batch or single str.
        Args:
            text (str or list of str): texts to convert.
        Returns:
            torch.IntTensor [length_0 + length_1 + ... length_{n - 1}]: encoded texts.
            torch.IntTensor [n]: length of each text.
        """
        if isinstance(text, str):
            text = [
                self.dict[char.lower() if self._ignore_case else char]
                for char in text
            ]
            length = [len(text)]
        elif isinstance(text, collections.Iterable):
            length = [len(s) for s in text]
            text = ''.join(text)
            text, _ = self.encode(text)  
                
        return (torch.IntTensor(text), torch.IntTensor(length))

    def decode(self, t, length, raw=False):
        """Decode encoded texts back into strs.
        Args:
            torch.IntTensor [length_0 + length_1 + ... length_{n - 1}]: encoded texts.
            torch.IntTensor [n]: length of each text.
        Raises:
            AssertionError: when the texts and its length does not match.
        Returns:
            text (str or list of str): texts to convert.
        """
        if length.numel() == 1:
            length = length[0]
            assert t.numel() == length, "text with length: {} does not match declared length: {}".format(t.numel(), length)
            if raw:
                return ''.join([self.alphabet[i - 1] for i in t])
            else:
                char_list = []
                for i in range(length):
                    if t[i] != 0 and (not (i > 0 and t[i - 1] == t[i])):
                        char_list.append(self.alphabet[t[i] - 1])
                return ''.join(char_list)
        else:
            # batch mode
            assert t.numel() == length.sum(), "texts with length: {} does not match declared length: {}".format(t.numel(), length.sum())
            texts = []
            index = 0
            for i in range(length.numel()):
                l = length[i]
                texts.append(
                    self.decode(
                        t[index:index + l], torch.IntTensor([l]), raw=raw))
                index += l
            return texts

#Model

In [5]:
class BidirectionalLSTM(nn.Module):
    def __init__(self, nIn, nHidden, nOut, dropout=0):
        """
        nIn : number of input features
        nOut : number of output  features
        nHidden : number of hidden features
        input shape : if batch_first= False(default)  seq_len, batch, nIn
        hidden_state shape: n_layer * number_direc , batch,hidden_size
        out shape : seq_len, batch, num_direc * hidden_size
        """
        super(BidirectionalLSTM, self).__init__()

        self.rnn = nn.LSTM(nIn, nHidden, bidirectional=True)
        self.embedding = nn.Linear(nHidden * 2, nOut)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input):
        recurrent, _ = self.rnn(input)
        recurrent = self.dropout(recurrent)
        T, b, h = recurrent.size()
        t_rec = recurrent.view(T * b, h)

        output = self.embedding(t_rec)  # [T * b, nOut]
        output = output.view(T, b, -1)

        return output


class CRNN(nn.Module):
    def __init__(self, imgH, nc, nclass, nh, n_rnn=2, leakyRelu=False):
        """
        nc : n_ chanel
        nh : n_hidden_state of rnn
        imgH: image high size
        input Conv2d :(batch_size, channel_in, High_in, Width_in)
        High_in = 32
        """

        super(CRNN, self).__init__()
        assert imgH % 16 == 0, "imgH has to be a multiple of 16"

        self.log_softmax = nn.LogSoftmax(-1)
        # kernel size
        ks = [3, 3, 3, 3, 3, 3, 2]
        # padding size
        ps = [1, 1, 1, 1, 1, 1, 0]
        # stride size
        ss = [1, 1, 1, 1, 1, 1, 1]

        # n_
        nm = [64, 128, 256, 256, 512, 512, 512]

        cnn = nn.Sequential()

        def convRelu(i, batchNormalization=True):
            nIn = nc if i == 0 else nm[i - 1]
            nOut = nm[i]
            cnn.add_module(
                "conv{0}".format(i), nn.Conv2d(nIn, nOut, ks[i], ss[i], ps[i])
            )
            if batchNormalization:
                cnn.add_module("batchnorm{0}".format(i), nn.BatchNorm2d(nOut))
            if leakyRelu:
                cnn.add_module("relu{0}".format(i), nn.LeakyReLU(0.2, inplace=True))
            else:
                cnn.add_module("relu{0}".format(i), nn.ReLU(True))

        convRelu(0)
        cnn.add_module("pooling{0}".format(0), nn.MaxPool2d(2, 2))  # 64x16x64
        convRelu(1)
        cnn.add_module("pooling{0}".format(1), nn.MaxPool2d(2, 2))  # 128x8x32
        convRelu(2)
        convRelu(3)
        cnn.add_module(
            "pooling{0}".format(2), nn.MaxPool2d((2, 2), (2, 2), (0, 0))
        )  # 256x4x16
        convRelu(4)
        convRelu(5)
        cnn.add_module(
            "pooling{0}".format(3), nn.MaxPool2d((2, 2), (2, 1), (0, 0))
        )  # 512x2x16
        convRelu(6)  # 512x1x16

        self.cnn = cnn
        # nh  = n_hidden

        self.rnn = nn.Sequential(
            BidirectionalLSTM(512, nh, nh, 0), BidirectionalLSTM(nh, nh, nclass, 0)
        )

    def forward(self, input):
        # conv features
        conv = self.cnn(input)
        # print("output conv.shape", conv.shape)
        b, c, h, w = conv.size()
        assert h == 1, "the height of conv must be 1"
        conv = conv.squeeze(2)
        conv = conv.permute(2, 0, 1)  # [w, b, c]
        # print("input rnn.shape", conv.shape)
        # rnn features
        output = self.rnn(conv)
        # print("output.shape: ", output.shape)
        return self.log_softmax(output)

#Hyperparams

In [6]:
train_path = "/content/drive/MyDrive/Colab Notebooks/OCR/Cinnamon_VN_data/train"
val_path = "/content/drive/MyDrive/Colab Notebooks/OCR/Cinnamon_VN_data/validate"

alphabet = open(osp.join("/content/drive/MyDrive/Colab Notebooks/OCR/Cinnamon_VN_data","alphabet.txt")).read().rstrip()
nclass = len(alphabet) + 1

lr = 0.01
epochs = 9
batch_size = 32
# mean = (0.485, 0.456, 0.406)
# std = (0.229, 0.224, 0.225)
mean = (0.5,)
std = (0.5,)
resize = (32, 824)

#Dataloader

In [7]:
converter = strLabelConverter(alphabet, ignore_case=False)
print(converter.dict)
# train_dataset = MyDataset(train_path, make_lfile(train_path), transform=ImageTransform(resize))
# val_dataset = MyDataset(val_path, make_lfile(val_path), transform=ImageTransform(resize))

# train_dataloader = torch.utils.data.DataLoader(train_dataset, batch_size, shuffle=True)
# val_dataloader = torch.utils.data.DataLoader(val_dataset, batch_size, shuffle=False)
# dataloader_dict = {"train":train_dataloader, "val":val_dataloader}

{'!': 1, '"': 2, '$': 3, '%': 4, '&': 5, "'": 6, '(': 7, ')': 8, '+': 9, ',': 10, '-': 11, '.': 12, '/': 13, ' ': 14, '#': 15, '0': 16, '1': 17, '2': 18, '3': 19, '4': 20, '5': 21, '6': 22, '7': 23, '8': 24, '9': 25, ':': 26, ';': 27, '?': 28, 'A': 29, 'B': 30, 'C': 31, 'D': 32, 'E': 33, 'F': 34, 'G': 35, 'H': 36, 'I': 37, 'J': 38, 'K': 39, 'L': 40, 'M': 41, 'N': 42, 'O': 43, 'P': 44, 'Q': 45, 'R': 46, 'S': 47, 'T': 48, 'U': 49, 'V': 50, 'W': 51, 'X': 52, 'Y': 53, 'Z': 54, '[': 55, '\\': 56, ']': 57, '^': 58, '_': 59, 'a': 60, 'b': 61, 'c': 62, 'd': 63, 'e': 64, 'f': 65, 'g': 66, 'h': 67, 'i': 68, 'j': 69, 'k': 70, 'l': 71, 'm': 72, 'n': 73, 'o': 74, 'p': 75, 'q': 76, 'r': 77, 's': 78, 't': 79, 'u': 80, 'v': 81, 'w': 82, 'x': 83, 'y': 84, 'z': 85, '{': 86, '|': 87, '}': 88, '°': 89, '²': 90, 'À': 91, 'Á': 92, 'Â': 93, 'Ã': 94, 'È': 95, 'É': 96, 'Ê': 97, 'Ì': 98, 'Í': 99, 'Ð': 100, 'Ò': 101, 'Ó': 102, 'Ô': 103, 'Õ': 104, 'Ö': 105, 'Ù': 106, 'Ú': 107, 'Ü': 108, 'Ý': 109, 'à': 110, 'á': 1

#Load Model

In [8]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else 'cpu')

In [9]:
from torch.nn import CTCLoss
ctc_loss = CTCLoss().to(DEVICE)
model = CRNN(224, nc=1, nclass=nclass, nh=32).to(DEVICE)
optimizer = optim.AdamW(params=model.parameters(), lr =lr)

def weights_init(m):
    if isinstance(m, nn.Conv2d):
        nn.init.xavier_uniform(m.weight.data)
   
    if isinstance(m,nn.Linear):
        nn.init.xavier_uniform(m.weight.data)
        # nn.init.xavier_uniform(m.bias.data)

    # if isinstance(m,nn.LSTM):
        # nn.init.xavier_uniform(m.weight.data)
        # nn.init.xavier_uniform(m.bias.data)

model.apply(weights_init)
print("Done")

Done


  
  # This is added back by InteractiveShellApp.init_path()


In [10]:
path = "/content/drive/MyDrive/Colab Notebooks/OCR/saved_model/biimg_resize824_batch32_epoch9"
def load_model(model, optim, checkpoints):
    print("Loading model")
    model.load_state_dict(checkpoints["model_dict"])
    optim.load_state_dict(checkpoints["optimizer"])

load_model(model, optimizer, torch.load(path + "/weights.pth.rar"))

Loading model


# Image preprocessing

In [36]:
import cv2
from google.colab.patches import cv2_imshow
def imgprc(imagefilenpath):
    image = cv2.imread(imagefilenpath)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
    se = cv2.getStructuringElement(cv2.MORPH_RECT, (8, 8))
    bg = cv2.morphologyEx(image, cv2.MORPH_DILATE, se)
    out_gray = cv2.divide(image, bg, scale=255)
    out_binary = cv2.threshold(out_gray, 0, 255, cv2.THRESH_OTSU)[1]
    height, width = out_binary.shape
    if 1.0 * width / height < 20:
        r = (1.0 * 824 / 32 - 1.0 * width / height) * height
        r = int(np.floor(r))
        out_binary = cv2.copyMakeBorder(src=out_binary, top=0, bottom=0, left=0, right=r, borderType=cv2.BORDER_CONSTANT, value=[255, 255, 255])
    
    imagefilename = '/content/' + imagefilenpath.split("/")[-1]
    cv2.imwrite(imagefilename, out_binary)
    return imagefilename

In [12]:
imgtrf = ImageTransform(resize)

# Get output

In [13]:
from scipy.special import softmax
import editdistance

def string_similarity(target, got):
    return 1 - 1.0 * editdistance.eval(target, got) / max(len(target), len(got))

def beam_search_decoder(data, k=20):
    data = torch.squeeze(data.cpu()).detach().numpy()
    data = softmax(data, axis=1)
    sequences = [[list(), 0.0]]
    # walk over each step in sequence
    for row in data:
        all_candidates = list()
        # expand each current candidate
        for i in range(len(sequences)):
            seq, score = sequences[i]
            for j in range(len(row)):
                candidate = [seq + [j], score - np.log(row[j])]
                all_candidates.append(candidate)
        # order all candidates by score
        ordered = sorted(all_candidates, key=lambda tup:tup[1])
        # select k best
        sequences = ordered[:k]
    return sequences

def get_results(imgfn):
    img = Image.open(imgfn).convert("1")
    img = imgtrf(img)
    a = torch.unsqueeze(img,0)
    a = a.to(DEVICE)
    output = model(a)
    results = beam_search_decoder(output)
    final_results = []
    for result in results:
        y = torch.cuda.IntTensor(result[0])
        y = torch.unsqueeze(y, 1)
        y = converter.decode(y, torch.IntTensor([resize[1] / 8 - 2]))
        if y not in final_results:
            final_results.append(y)
    return final_results

# App

In [70]:
from flask import Flask
from flask import Flask, render_template, request, flash, url_for, send_file, jsonify, redirect
from flask_ngrok import run_with_ngrok
static_folder='/content/drive/MyDrive/Colab Notebooks/OCR/Demo/static'
template_folder='/content/drive/MyDrive/Colab Notebooks/OCR/Demo/templates'
app = Flask(__name__, static_folder=static_folder, template_folder=template_folder)
run_with_ngrok(app)

@app.route("/", methods=["GET", "POST"])
def home():
    if request.method == "POST":
        file = request.files['file']
        fn = file.filename
        file.save(os.path.join(static_folder, fn))
        # file.save(os.path.join('/content', fn))
        fn0 = static_folder + '/' + fn
        fn1 = imgprc(fn0)
        result = get_results(fn1)
        fn = './' + fn
        print(len(result))
        return render_template('result.html', fn=fn, result=result)
    return render_template("home.html")

app.run()

 * Serving Flask app "__main__" (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off


 * Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)


 * Running on http://8ba762fe733d.ngrok.io
 * Traffic stats available on http://127.0.0.1:4040


127.0.0.1 - - [31/May/2021 07:46:38] "[37mGET / HTTP/1.1[0m" 200 -
127.0.0.1 - - [31/May/2021 07:46:40] "[37mGET / HTTP/1.1[0m" 200 -
127.0.0.1 - - [31/May/2021 07:46:40] "[33mGET /favicon.ico HTTP/1.1[0m" 404 -
127.0.0.1 - - [31/May/2021 08:47:46] "[37mPOST / HTTP/1.1[0m" 200 -


14


127.0.0.1 - - [31/May/2021 08:47:46] "[37mGET /static/0038_tests.png HTTP/1.1[0m" 200 -
127.0.0.1 - - [31/May/2021 08:49:48] "[37mPOST / HTTP/1.1[0m" 200 -


3


127.0.0.1 - - [31/May/2021 08:49:48] "[37mGET /static/0014_tests.png HTTP/1.1[0m" 200 -
127.0.0.1 - - [31/May/2021 08:49:48] "[37mGET / HTTP/1.1[0m" 200 -
