In [None]:
import os
import random
import time
import timm
import xml.etree.ElementTree as ET

import cv2
import matplotlib.pyplot as plt
import numpy
import torch
import torch.nn as nn
import torchvision
from PIL import Image
from sklearn.model_selection import train_test_split
from torch.nn import functional as F
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms

In [3]:
def extract_data_from_xml(root_dir):
    xml_path = os.path.join(root_dir, "words.xml")
    tree = ET.parse(xml_path)
    root = tree.getroot()

    image_path = []
    image_size = []
    image_label = []
    image_bb = []

    for img in root:
        bbs_of_img = []
        labels_of_img = []

        image_path.append(img[0].text)
        image_size.append((int(img[1].attrib['x']),
                           int(img[1].attrib['y'])))

        for bbs in img.findall('taggedRectangles'):
            for bb in bbs:

                if not bb[0].text.isalnum():
                    continue
                if "é" in bb[0].text.lower() or "ñ" in bb[0].text.lower():
                    continue

                bbs_of_img.append(
                    [
                        float(bb.attrib['x']),
                        float(bb.attrib['y']),
                        float(bb.attrib['width']),
                        float(bb.attrib['height'])
                    ]
                )
                labels_of_img.append(bb[0].text.lower())
        image_label.append(labels_of_img)
        image_bb.append(bbs_of_img)

    return image_path, image_size, image_label, image_bb

In [4]:
data_dir = "/content/dataset/SceneTrialTrain"
image_path, image_size, image_label, image_boxes = extract_data_from_xml(data_dir)

print("Image path: ", image_path[0])
print("Image size: ", image_size[0])
print("Image label: ", image_label[0])
print("Image boxes: ", image_boxes[0])

Image path:  apanar_06.08.2002/IMG_1261.JPG
Image size:  (1600, 1200)
Image label:  ['self', 'adhesive', 'address', 'labels', '36', '89m', 'cls', '250', 'on', 'a', 'roll']
Image boxes:  [[174.0, 392.0, 274.0, 195.0], [512.0, 391.0, 679.0, 183.0], [184.0, 612.0, 622.0, 174.0], [863.0, 599.0, 446.0, 187.0], [72.0, 6.0, 95.0, 87.0], [247.0, 2.0, 197.0, 88.0], [792.0, 0.0, 115.0, 81.0], [200.0, 848.0, 228.0, 139.0], [473.0, 878.0, 165.0, 109.0], [684.0, 878.0, 71.0, 106.0], [806.0, 844.0, 218.0, 141.0]]


In [6]:
def split_bounding_boxes(image_paths, image_labels, image_bbs, root_dir, save_dir):
    os.makedirs(save_dir, exist_ok=True)
    count = 0
    labels = []
    for image_path, image_label, image_bb in zip(image_paths, image_labels, image_bbs):
        image = Image.open(os.path.join(root_dir, image_path))

        for label, bb in zip(image_label, image_bb):
            image_cropped = image.crop((bb[0], bb[1], bb[0] + bb[2], bb[1] + bb[3]))
            print(np.mean(image_cropped))
            print(image_cropped.size)

            if np.mean(image_cropped) < 35 or np.mean(image_cropped) > 220:
                continue

            if image_cropped.size[0] < 10 or image_cropped.size[1] < 10:
                continue

            file_name = f"{count:06d}.jpg"
            image_cropped.save(os.path.join(save_dir, file_name))
            new_path = os.path.join(save_dir, file_name)
            label = new_path + "\t" + label
            labels.append(label)

            count += 1
    print(f"Create {count} images")

    with open(os.path.join(save_dir, "labels.txt"), 'w') as f:
        for label in labels:
            f.write(f"{label}\n")



split_bounding_boxes(image_paths=image_path, image_labels=image_label, image_bbs=image_boxes, root_dir=data_dir, save_dir="dataset_text_recognition")

179.74366460789818
(274, 195)
174.3938906191737
(679, 183)
168.40684788902442
(622, 174)
160.04178956539812
(446, 187)
200.70364992942126
(95, 87)
229.7518843254884
(197, 88)
238.98457684737878
(115, 81)
208.23240355084354
(228, 139)
205.55492540079695
(165, 109)
197.1346886349544
(71, 106)
210.4765328475069
(218, 141)
172.5817728782845
(847, 172)
167.66635515550965
(502, 81)
169.0295499800876
(540, 31)
155.38473713428533
(409, 69)
161.01171706728698
(265, 73)
160.42351492767006
(57, 19)
135.28183421516755
(63, 15)
155.9980694980695
(111, 14)
159.51071428571427
(70, 16)
152.62282282282283
(74, 15)
128.28710089399743
(145, 18)
128.42938468992247
(64, 43)
123.4662568306011
(61, 40)
175.6451133407655
(138, 52)
176.71166276346605
(350, 61)
160.43034722222222
(64, 75)
176.21172516803585
(103, 52)
164.99279388647648
(363, 59)
183.9180614394518
(374, 77)
183.0517496853814
(644, 102)
178.4957264957265
(99, 26)
187.4148148148148
(85, 27)
184.22317188983857
(54, 26)
179.11131313131312
(132, 25)


In [8]:
labels = []
image_paths = []

with open("/content/dataset_text_recognition/labels.txt", 'r') as f:
    for label in f:
        label_split = label.split('\t')
        labels.append(label_split[1])
        image_paths.append(label_split[0])

print(labels)
print(image_paths)


['dataset_text_recognition/000000.jpg', 'dataset_text_recognition/000001.jpg', 'dataset_text_recognition/000002.jpg', 'dataset_text_recognition/000003.jpg', 'dataset_text_recognition/000004.jpg', 'dataset_text_recognition/000005.jpg', 'dataset_text_recognition/000006.jpg', 'dataset_text_recognition/000007.jpg', 'dataset_text_recognition/000008.jpg', 'dataset_text_recognition/000009.jpg', 'dataset_text_recognition/000010.jpg', 'dataset_text_recognition/000011.jpg', 'dataset_text_recognition/000012.jpg', 'dataset_text_recognition/000013.jpg', 'dataset_text_recognition/000014.jpg', 'dataset_text_recognition/000015.jpg', 'dataset_text_recognition/000016.jpg', 'dataset_text_recognition/000017.jpg', 'dataset_text_recognition/000018.jpg', 'dataset_text_recognition/000019.jpg', 'dataset_text_recognition/000020.jpg', 'dataset_text_recognition/000021.jpg', 'dataset_text_recognition/000022.jpg', 'dataset_text_recognition/000023.jpg', 'dataset_text_recognition/000024.jpg', 'dataset_text_recognitio

In [9]:
letters = [char.split('.')[0].lower() for char in labels]
letters = "".join(letters)
letters = sorted(list(set(list(letters))))

chars = "".join(letters)
blank_char = "-"
chars += blank_char
vocab_size = len(chars)

print(f"Size vocab: {vocab_size}")
print(f"Vocab: {chars}")

Size vocab: 38
Vocab: 
0123456789abcdefghijklmnopqrstuvwxyz-


In [10]:
char_to_index = {char:index+1 for index, char in enumerate(sorted(chars))}
index_to_char = {index:char for char, index in char_to_index.items()}

In [11]:
max_len_label = max([len(label) for label in labels])

def encode(label, char_to_index, max_label_len):
    encoded_labels = torch.tensor(
        [char_to_index[char] for char in label],
        dtype=torch.int32
    )

    lable_lenght = len(encoded_labels)

    lenghts = torch.tensor(
        lable_lenght, dtype=torch.int32
    )

    padded_lables = F.pad(
        encoded_labels,
        (0, max_label_len - lable_lenght),
        value=0
    )

    return lenghts, padded_lables



In [12]:
def decode(encode_sequences, index_to_char, blank_char="-"):
    decode_sequences = []

    for seq in encode_sequences:
        decode_label = []
        prev_char = None

        for token in seq:
            if token != 0:
                char = index_to_char[token.item()]

                if char != blank_char:
                    if char != prev_char or prev_char == blank_char:
                        decode_label.append(char)

                pre_char = char
        decode_sequences.append("".join(decode_label))

    print(f"From {encode_sequences} to {decode_sequences}")
    return decode_sequences

In [13]:
data_transform = {
    "train" : transforms.Compose(
        [
            transforms.Resize((100, 420)),
            transforms.ColorJitter(
                brightness=0.5,
                contrast=0.5,
                saturation=0.5
            ),
            transforms.Grayscale(
                num_output_channels=1
            ),
            transforms.GaussianBlur(3),
            transforms.RandomAffine(
                degrees=1,
                shear=1
            ),
            transforms.RandomPerspective(
                distortion_scale=0.3,
                interpolation=3
            ),
            transforms.RandomRotation(
                degrees=2
            ),
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))
        ]
    ),

    "val" : transforms.Compose(
        [
            transforms.Resize((100, 420)),
            transforms.Grayscale(1),
            transforms.ToTensor(),
            transforms.Normalize((0.5,), (0.5,))
        ]
    )

}

In [14]:
val_size = 0.2
test_size = 0.125
random_state = 0
is_shuffle = True

X_train, X_val, y_train, y_val = train_test_split(
    image_paths, labels,
    test_size=val_size,
    random_state=random_state,
    shuffle=is_shuffle
)

X_train, X_test, y_train, y_test = train_test_split(
    X_train, y_train,
    test_size=test_size,
    random_state=random_state,
    shuffle=is_shuffle
)

print(f"Train: {len(X_train)}")
print(f"Validation: {len(X_val)}")
print(f"Test size: {len(X_test)}")


Train: 761
Validation: 218
Test size: 109


In [15]:
class STRDataset(Dataset):
    def __init__(self, X, y, char_to_index, max_len_label, label_encoder=None, transforms=None):
        self.X = X
        self.y = y
        self.char_to_index = char_to_index
        self.max_len_label = max_len_label
        self.label_encoder = label_encoder
        self.transforms = transforms

    def __len__(self):
        return len(self.X)

    def __getitem__(self, index):
        label = self.y[index]
        image_path = self.X[index]
        image = Image.open(image_path)

        if self.transforms:
            image = self.transforms(image)

        if self.label_encoder:
            len_label, label = self.label_encoder(label, char_to_index, max_len_label)

        return image, label, len_label


In [16]:
train_dataset = STRDataset(
    X_train, y_train,
    char_to_index,
    max_len_label,
    label_encoder=encode,
    transforms=data_transform['train']
)

val_dataset = STRDataset(
    X_val, y_val,
    char_to_index,
    max_len_label,
    label_encoder=encode,
    transforms=data_transform['val']
)

test_dataset = STRDataset(
    X_test, y_test,
    char_to_index,
    max_len_label,
    label_encoder=encode,
    transforms=data_transform['val']
)

train_batch_size = 64
test_batch_size = 64 * 2

train_loader = DataLoader(
    train_dataset,
    batch_size=train_batch_size,
    shuffle=True
)

val_loader = DataLoader(
    val_dataset,
    batch_size=test_batch_size,
    shuffle=False
)

test_loader = DataLoader(
    test_dataset,
    batch_size=test_batch_size,
    shuffle=False
)

In [17]:
class CRNN(nn.Module):

    def __init__(self, vocab_size, hidden_size, num_layer, unfreeze_layers=3, dropout=0.2):
        super(CRNN, self).__init__()
        backbone = timm.create_model("resnet152", pretrained=True, in_chans=1)
        # Tuong tac voi resnet152 model
        modules = list(backbone.children())[:-2]
        modules.append(nn.AdaptiveAvgPool2d((1, None)))
        self.backbone = nn.Sequential(*modules)

        for para in self.backbone[-unfreeze_layers:].parameters():
            para.requires_grad = True

        self.mapSeq = nn.Sequential(
            nn.Linear(2048, 512),
            nn.ReLU(),
            nn.Dropout(dropout)
        )

        self.GRU = nn.GRU(
            512, hidden_size,
            num_layer,
            dropout=dropout if num_layer > 1 else 0,
            bidirectional=True,
            batch_first=True
        )

        self.layer_norm = nn.LayerNorm(hidden_size*2)

        self.out = nn.Sequential(
            nn.Linear(hidden_size*2, vocab_size),
            nn.LogSoftmax(2)
        )

    def forward(self, x):
        x = self.backbone(x)
        x = x.permute(0, 3, 1, 2)
        x = x.view(x.size(0), x.size(1), -1)
        x = self.mapSeq(x)
        x, _ = self.GRU(x)
        x = self.layer_norm(x)
        x = self.out(x)
        x = x.permute(1, 0, 2)  #based CTC loss
        return x




In [None]:
hidden_size = 256
num_layer = 3
dropout = 0.2
unfreeze_layer = 3
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Device: {device}")

models = CRNN(vocab_size, hidden_size, num_layer, unfreeze_layer, dropout=dropout).to(device)

In [19]:
def evaluate(model, dataloader, criterion, device):
    model.eval()
    losses = []

    with torch.no_grad():
        for inputs, labels, label_len in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            label_len = label_len.to(device)

            output = model(inputs)
            logits_lens = torch.full(
                size=(output.size(1),),
                fill_value=output.size(0),
                dtype=torch.long
            )
            loss = criterion(output, labels, logits_lens, label_len)
            losses.append(loss.item())
    loss = sum(losses) / len(losses)

    return loss




In [20]:
def train(model, dataloader, criterion, optimizer, scheduler, device, epochs):
    train_losses = []
    val_losses = []

    for epoch in range(epochs):
        start = time.time()

        model.train()
        batch_train_loss = []
        for inputs, labels, labels_len in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            labels_len = labels_len.to(device)

            optimizer.zero_grad()
            output = model(inputs)

            logits_lens = torch.full(
                size=(output.size(1), ),
                fill_value=output.size(0),
                dtype=torch.long
            )
            loss = criterion(output, labels, logits_lens, labels_len)
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 5)
            optimizer.step()

            batch_train_loss.append(loss)

        train_loss = sum(batch_train_loss)/len(batch_train_loss)
        val_loss = evaluate(model, val_loader, criterion, device)

        train_losses.append(train_loss)
        val_losses.append(val_loss)
        print(
            f"EPOCH {epoch + 1}: \tTrain loss {train_loss:.4f} \tValidation loss {val_loss:.4f} \tTime {time.time() - start:.2f} seconds"
        )
        scheduler.step()

    return train_losses, val_losses


In [21]:
epochs = 100
lr = 5e-4
weight_decay = 1e-4
scheduler_step_size = epochs * 0.5

criterion = nn.CTCLoss(
    blank=char_to_index[blank_char],
    reduction="mean",
    zero_infinity=True
)

optimizer = torch.optim.Adam(
    models.parameters(),
    lr=lr,
    weight_decay=weight_decay,
)

scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=scheduler_step_size,
    gamma=0.1
)

In [22]:
train_losses, val_losses = train(models, train_loader, criterion, optimizer, scheduler, device, epochs)

EPOCH 1: 	Train loss 3.6159 	Validation loss 2.9756 	Time 7.83 seconds
EPOCH 2: 	Train loss 2.8239 	Validation loss 2.8151 	Time 5.80 seconds
EPOCH 3: 	Train loss 2.6675 	Validation loss 2.7326 	Time 5.64 seconds
EPOCH 4: 	Train loss 2.4844 	Validation loss 2.4643 	Time 5.64 seconds
EPOCH 5: 	Train loss 2.2273 	Validation loss 2.3105 	Time 5.68 seconds
EPOCH 6: 	Train loss 1.9405 	Validation loss 1.9031 	Time 5.76 seconds
EPOCH 7: 	Train loss 1.5690 	Validation loss 1.5759 	Time 5.76 seconds
EPOCH 8: 	Train loss 1.1738 	Validation loss 1.2897 	Time 5.65 seconds
EPOCH 9: 	Train loss 0.8713 	Validation loss 1.0361 	Time 5.76 seconds
EPOCH 10: 	Train loss 0.6927 	Validation loss 0.9766 	Time 5.68 seconds
EPOCH 11: 	Train loss 0.5678 	Validation loss 0.8655 	Time 5.80 seconds
EPOCH 12: 	Train loss 0.4592 	Validation loss 0.7623 	Time 5.60 seconds
EPOCH 13: 	Train loss 0.4014 	Validation loss 0.7141 	Time 5.86 seconds
EPOCH 14: 	Train loss 0.3462 	Validation loss 0.6520 	Time 5.63 seconds
E

In [23]:
torch.save(models.state_dict(), "/content/text_recognition.pt")