In [None]:
import os
import fnmatch
import cv2
import numpy as np
import string
import time
from glob import glob
import matplotlib.pyplot as plt 

import torch.nn as nn
import torch
from torch.autograd import Variable
from torch.nn.utils.rnn import pad_sequence
import torch.utils.data as Data

# Preprocessing

In [None]:
from google.colab import drive
drive.mount('/content/drive')
!unzip '/content/drive/My Drive/OCR/Data/dataset.zip'

In [None]:
# !rm -rf dataset dataset.zip __MACOSX
# # small dataset
# !wget https://transfer.sh/MrGxw/dataset.zip
# # large dataset
# #!wget https://transfer.sh/NwLvB/dataset.zip
# !unzip -qq dataset.zip


In [None]:
# In this project, we train the model to recognize the word by predicting each character/digit for words
# So the char_list is the list of all characters (distinguish captical and lower case) and digits
# And we transform each word into char vector

# char_list:   'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789'
char_list = string.ascii_letters + string.digits
print('char_list:',char_list)
print('total length:', len(char_list))

char_list: abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789
total length: 62


In [None]:
# every word is encoded as a list of digits
# the digit for each character is represented by the index
# e.g. aabb -> [0,0,1,1], index of a is 0, index of b is 1

def encode_to_labels(txt):
    # encoding each output word into digits
    dig_lst = []
    for index, char in enumerate(txt):
        try:
            dig_lst.append(char_list.index(char))
        except:
            print(char)
        
    return dig_lst

In [None]:
from sklearn.model_selection import train_test_split

path = 'dataset/'

x = [] # the image
y = [] # the char vector
x_len = [] #?
y_len = [] #?
orig_y = [] # the original word
 
max_label_len = 0
 
# why flag
flag = 0
 
for i, f_name in enumerate(glob(os.path.join(path,'*/*.jpg'))):

    # read input image and convert into gray scale image
    img = cv2.cvtColor(cv2.imread(f_name), cv2.COLOR_BGR2GRAY)  

    # convert each image of shape (32, 128, 1)
    img = cv2.resize(img,(128,32))
    img = np.expand_dims(img , axis = 0)

    # Normalize each image
    img = img/255.

    # get the text from the image
    txt = os.path.basename(f_name).split('_')[1]

    # compute maximum length of the text
    if len(txt) > max_label_len:
        max_label_len = len(txt)

    x.append(img)
    y.append(encode_to_labels(txt)) 
    x_len.append(31)
    y_len.append(len(txt))
    orig_y.append(str(txt))

In [None]:
# pad each output label to maximum text length
# use "post" padding
# this is not zero padding, we want to pad a specific value: len(char_list) + 1
 
for vector in y:
  vector.extend([len(char_list)] * (max_label_len - len(vector)))

# Model Archtecture

In [None]:
class CRNN(nn.Module):
  def __init__(self, char_list):
    super(CRNN, self).__init__()
    
    # Input with shape of height = 32 and width = 128 

    # Conv2D: 64 filters, kernels (3,3), rectified unit
    # Pooling: size (2, 2), stride 2
    self.layer1 = nn.Sequential(
        nn.Conv2d(in_channels = 1, out_channels = 64, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.MaxPool2d(2, stride = 2)
    )

    self.layer2 = nn.Sequential(
        nn.Conv2d(in_channels = 64, out_channels = 128, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.MaxPool2d(2, stride = 2)
    )

    self.layer3 = nn.Sequential(
        nn.Conv2d(in_channels = 128, out_channels = 256, kernel_size = 3, padding = 1),
        nn.ReLU()
    )

    self.layer4 = nn.Sequential(
        nn.Conv2d(in_channels = 256, out_channels = 256, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.MaxPool2d((2,1))
    )
    
    # Batch normalization layer, 
    # blog: https://towardsdatascience.com/batch-normalization-in-neural-networks-1ac91516821c
    self.layer5 = nn.Sequential(
        nn.Conv2d(in_channels = 256, out_channels = 512, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.BatchNorm2d(512) # number of channels
    )

    self.layer6 = nn.Sequential(
        nn.Conv2d(in_channels = 512, out_channels = 512, kernel_size = 3, padding = 1),
        nn.ReLU(),
        nn.BatchNorm2d(512),
        nn.MaxPool2d((2, 1))
    )

    self.layer7 = nn.Sequential(
        nn.Conv2d(in_channels = 512, out_channels = 512, kernel_size = 2),
        nn.ReLU()
    )

    self.lstm = nn.LSTM(input_size = 512, hidden_size = 128, num_layers = 2, dropout = 0.2, batch_first = True, bidirectional = True)
    
    self.fc = nn.Linear(in_features = 256, out_features = len(char_list) + 1)

    self.softmax = nn.Softmax(dim = 2)

  def forward(self, x, device):

    # x.shape = (batch_size, channel_size, width, height)

    # DCNN
    output = self.layer1(x)
    output = self.layer2(output)
    output = self.layer3(output)
    output = self.layer4(output)
    output = self.layer5(output)
    output = self.layer6(output)
    output = self.layer7(output)

    # reduce the dimension
    output = torch.squeeze(output, 2)
    # print(output.shape)

    # LSTM: we want to return sequences, not the last output
    # here the ouput_size (which is the input for lstm) is (batch_size, num_channels, num_features)
    # the required "input" for lstem is (seq_len, batch, input_size)
    # so we have to reshape the output
    output = output.permute(2, 0, 1)
    # print(output.shape)

    # Initilize h and c
    # Their size is (num_layers * num_directions, batch, hidden_size)
    h_shape = (4, output.shape[0], 128)
    h = Variable(torch.zeros(h_shape)).to(device)
    c = Variable(torch.zeros(h_shape)).to(device)

    output, (_, _) = self.lstm(output, (h, c))
    # print(output.shape)

    # our final output has [len(char_list)+1] classes
    # we need to use softmax as the activation function
    output = self.fc(output)
    output = self.softmax(output)
    # print(output.shape)

    return output


# Model Training

## Training per epoch

In [None]:
def train(train_loader, model, loss_function, optimizer, i, num_epoch, device):
    """
    Performs one epoch's training.
    :param train_loader: DataLoader for training data
    :param model: model
    :param loss_function: loss layer
    :param decoder_optimizer: optimizer to update decoder's weights
    :param epoch: epoch number
    """

    model.train()  # train mode
    losses = []

    # Load by batches
    for (train_x, train_y, train_x_len, train_y_len, train_orig_y) in train_loader:
        
        train_x = train_x.to(device)
        predict_y = model(train_x, device)
        # print(torch.sum(predict_y, dim = 2))

        # CTC_LOSS: https://zhuanlan.zhihu.com/p/108547594
        # ctc_loss shape: https://zhuanlan.zhihu.com/p/67415439
        # input: (seq_len, batch_size, length of char list)
        # target: (batch_size, len_)
        train_x_len = train_x_len.type(torch.LongTensor)
        train_y_len = train_y_len.type(torch.LongTensor)

        # print(type(predict_y))
        # print(type(train_y))

        loss = ctc_loss(predict_y, train_y, train_x_len, train_y_len)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()

        # Keep track of metrics
        losses.append(loss.item())

    # Print status for each epoch 
    print('Epoch: [{0}/{1}] \t Train_loss: {2}'.format(i, num_epoch, sum(losses)))


In [None]:
from sklearn import preprocessing

# Here we change orig_y into labels for putting into torch.Tensor
le = preprocessing.LabelEncoder()
label_y = le.fit_transform(orig_y)

# Use le.inverse_transform(label_y) to transfer back to list of words

In [None]:
# Transfer all variables into torch tensor
x = Variable(torch.Tensor(x))
y = Variable(torch.Tensor(y))
x_len = Variable(torch.Tensor(x_len))
y_len = Variable(torch.Tensor(y_len))
label_y = Variable(torch.Tensor(label_y))

# Train Test Split
train_x, val_x, train_y, val_y, train_x_len, val_x_len, train_y_len, val_y_len, train_label_y, val_label_y = train_test_split(x, y, x_len, y_len, label_y, test_size = 0.1, random_state = 2)

# Training and testing dataset
train_dataset = Data.TensorDataset(train_x, train_y, train_x_len, train_y_len, train_label_y)
test_dataset = Data.TensorDataset(val_x, val_y, val_x_len, val_y_len, val_label_y)

# DataLoader
batch_size = 200
train_loader = Data.DataLoader(train_dataset, batch_size, True)
test_loader = Data.DataLoader(test_dataset, batch_size, True)

In [None]:
# print('Number of training data:',len(train_x))
# print('Number of validation data:',len(val_x))

# plt.imshow(train_x[0][:,:,0], cmap='gray')
# print('Label value: ',train_y[0])
# print('Raw Label value: ', train_orig_y[0])

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Loss function: CTC Details: (https://theailearner.com/2019/05/29/connectionist-temporal-classificationctc/)
ctc_loss = nn.CTCLoss(blank = len(char_list))
learning_rate = 0.01

model = CRNN(char_list)
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr = learning_rate)

print('Train in {0} samples, validate in {1} samples \n'.format(len(train_x), len(val_x)))

epoch = 10
train(train_loader, model, ctc_loss, optimizer, 1, epoch, device)

Train in 2880 samples, validate in 321 samples 

Epoch: [1/10] 	 Train_loss: -57.0730676651001


## Validation per epoch

In [None]:
def validate(val_loader, encoder, decoder, loss_function):
    """
    Performs one epoch's validation.
    :param val_loader: DataLoader for validation data.
    :param encoder: encoder model
    :param decoder: decoder model
    :param loss_function: loss layer
    :return: BLEU-4 score
    """
    # eval mode (no dropout or batchnorm)
    decoder.eval()  
    encoder.eval()

    losses = AverageMeter()
    top5accs = AverageMeter()

    references = list()  # references (true captions) for calculating BLEU-4 score
    hypotheses = list()  # hypotheses (predictions)

    with torch.no_grad():
        # Batches
        for i, (imgs, caps, caplens, allcaps) in enumerate(val_loader):
            imgs = imgs.to(device)
            caps = caps.to(device)
            caplens = caplens.to(device)

            # Forward prop.
            encoded_imgs = encoder(imgs)
            scores, caps_sorted, decode_lengths, alphas, sort_ind = decoder(encoded_imgs, caps, caplens)
            targets = caps_sorted[:, 1:]

            # Remove timesteps that we didn't decode at, or are pads
            # pack_padded_sequence is an easy trick to do this
            scores_copy = scores.clone()

            scores, _, _, _ = pack_padded_sequence(scores, decode_lengths, batch_first=True)
            targets, _, _, _ = pack_padded_sequence(targets, decode_lengths, batch_first=True)

            # Calculate loss
            loss = loss_function(scores, targets)

            # Add doubly stochastic attention regularization
            loss += alpha_c * ((1. - alphas.sum(dim=1)) ** 2).mean()

            # Keep track of metrics
            losses.update(loss.item(), sum(decode_lengths))
            top5 = accuracy(scores, targets, 5)
            top5accs.update(top5, sum(decode_lengths))

            if i % 100 == 0:
                print('Validation: [{0}/{1}]\t'
                      'Loss {loss.val:.4f} ({loss.avg:.4f})\t'
                      'Top-5 Accuracy {top5.val:.3f} ({top5.avg:.3f})\t'.format(i, len(val_loader),
                                                                                loss=losses, top5=top5accs))

            # Store references (true captions), and hypothesis (prediction) for each image
            # If for n images, we have n hypotheses, and references a, b, c... for each image, we need -
            # references = [[ref1a, ref1b, ref1c], [ref2a, ref2b], ...], hypotheses = [hyp1, hyp2, ...]

            # References
            allcaps = allcaps[sort_ind]  # because images were sorted in the decoder
            for j in range(allcaps.shape[0]):
                img_caps = allcaps[j].tolist()
                img_captions = list(
                    map(lambda c: [w for w in c if w not in {word_map['<start>'], word_map['<pad>']}],
                        img_caps))  # remove <start> and pads
                references.append(img_captions)

            # Hypotheses
            _, preds = torch.max(scores_copy, dim=2)
            preds = preds.tolist()
            temp_preds = list()
            for j, p in enumerate(preds):
                temp_preds.append(preds[j][:decode_lengths[j]])  # remove pads
            preds = temp_preds
            hypotheses.extend(preds)

            assert len(references) == len(hypotheses)

        # Calculate BLEU-4 scores
        bleu4 = corpus_bleu(references, hypotheses)

        print(
            '\n * LOSS - {loss.avg:.3f}, TOP-5 ACCURACY - {top5.avg:.3f}, BLEU-4 - {bleu}\n'.format(
                loss=losses,
                top5=top5accs,
                bleu=bleu4))

    return bleu4


## Train the Model

In [None]:
# Download a trained model
!rm best_model.hdf5
!wget https://transfer.sh/11ClXx/best_model.hdf5

rm: cannot remove 'best_model.hdf5': No such file or directory
--2020-10-26 03:28:24--  https://transfer.sh/11ClXx/best_model.hdf5
Resolving transfer.sh (transfer.sh)... 144.76.136.153
Connecting to transfer.sh (transfer.sh)|144.76.136.153|:443... connected.
HTTP request sent, awaiting response... ^C


In [None]:

# load the saved best model weights
act_model.load_weights('best_model.hdf5')
 
# predict outputs on validation images
prediction = act_model.predict(val_x[:10])
 
# use CTC decoder
out = K.get_value(K.ctc_decode(prediction, input_length=np.ones(prediction.shape[0])*prediction.shape[1],
                         greedy=True)[0][0])

# see the results
i = 0
for x in out:
    print("original_text =  ", val_orig_y[i])
    print("predicted text = ", end = '')
    for p in x:  
        if int(p) != -1:
            print(char_list[int(p)], end = '')       
    print('\n')
    i+=1

In [None]:
'''
We want to know the accuracy [the number of correct/total number]
 1. if the predicted text == truth label, we count as 1
 2. otherwise, we count as 0
e.g.
If we have a word "Theword",
then the predicted text:
"Theword" -> 1
"Theward" -> 0
'''
prediction = act_model.predict(val_x)
out = K.get_value(K.ctc_decode(prediction, input_length=np.ones(prediction.shape[0])*prediction.shape[1],
                         greedy=True)[0][0])

count = 0
#How to get count?
print('Accuracy:', count/len(val_orig_y))

# Inference

## Upload a picture

In [None]:
from google.colab import files
files.upload()

## Detection using open source

In [None]:
#Intall libs
#https://gitlab.gnome.org/World/OpenPaperwork/pyocr
!apt-get install tesseract-ocr
!pip install pyocr
!pip install wand
!pip install pillow
!apt-get install libmagickwand-dev
!apt-get install freetype imagemagick


In [None]:
import cv2
import numpy as np
import matplotlib.pyplot as plt

from wand.image import Image
from PIL import Image
import pyocr
import pyocr.builders
import io


In [None]:
img_path = ? #where is the image you uploaded
image = cv2.imread(img_path,0)
plt.figure(figsize=(15,15))
plt.imshow(image,cmap='gray')

In [None]:
tool = pyocr.get_available_tools()[0]
lang = tool.get_available_languages()[0] #english

In [None]:
### Detection of words
image = cv2.imread(img_path,0)
word_boxes = tool.image_to_string(
    Image.open(img_path),
    lang=lang,
    builder=pyocr.builders.WordBoxBuilder()
)
for box in word_boxes:
    cv2.rectangle(image, box.position[0], box.position[1], color=(0,0,255), thickness=2)
plt.figure(figsize=(20,20))
plt.imshow(image,cmap='gray')

In [None]:
### Detection of lines
image = cv2.imread(img_path,0)
line_boxes = tool.image_to_string(
    Image.open(img_path), lang=lang,
    builder=pyocr.builders.LineBoxBuilder()
)
for box in line_boxes:
    cv2.rectangle(image, box.position[0], box.position[1], color=(0,0,255), thickness=2)
plt.figure(figsize=(20,20))
plt.imshow(image,cmap='gray')



## Prediction by our OCR model

In [None]:
'''
Now we want to use the OCR model to predict each boxes
- We should use word_boxes instead of line_boxes(Why?)
- We should convert the structure of word_boxes to be the input of OCR model
Requirements:
1. Should crop the box from the original image
2. The crop should be resized to (128, 32, 1)
3. The crop Should be normalized to 0-1
'''

boxes_x = []
for box in word_boxes:
  pos = box.position
  crop = ?
  boxes_x.append(crop)


In [None]:
# Once you have the input of OCR model ready
# Use functions to the predicted result:
# 1. act_model.predict
# 2. K.get_value(K.ctc_decode(...))

# Note: K.get_value returns a list of list of indexes, each index corresponds to the character in char_list
# e.g. you should use char_list[index] to recover the characters
#      [0, 1, 0, -1] -> 'a','b','a',blank
?

## Visualize the result

In [None]:
# Now we want to visualize all the boxes and predicted text on the original image
# Hint:
# use cv2.rectangle to draw box
# use cv2.putText to draw text

image = cv2.imread(img_path)
?
plt.figure(figsize=(20,20))
plt.imshow(image,cmap='gray')

## Output raw text

e.g. The example image should give output:

I hope you all learned something

from my class. Keep in touch !