In [None]:
import os
import cv2
import glob
import datetime
import numpy as np

import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.optim import (
    Adam,
    lr_scheduler
)
from torch.utils.data import (
    Dataset,
    DataLoader
)

from tqdm.notebook import tqdm
from PIL import (
    Image,
    ImageDraw,
    ImageFont,
    ImageFilter
)

In [None]:
class SmallBasicBlock(nn.Module):
    def __init__(
        self,
        in_channels, 
        out_channels,
        activation,
        config
    ):
        """
        Small Basic Block
        
        :param in_channels: Channel of Input Tensor (int)
        :param out_channels: Channel of Output Tensor (int)
        :param activation: Activation Function (torch.nn)
        :param config:
            config["sbb_factor"]: Divisor of SBB's Hidden Size (int)   default: 4
            config["sbb_kernel_size"]: SBB Convolution Kernel Size (Odd int)   default: 5
        """
        super(SmallBasicBlock, self).__init__()
        
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.hidden_size = self.out_channels // config["sbb_factor"]
        self.kernel_size = config["sbb_kernel_size"]
        self.activation = activation
            
        self.bn = nn.BatchNorm2d(self.hidden_size)
            
        self.block = nn.Sequential(
            nn.Conv2d(
                in_channels=self.in_channels,
                out_channels=self.hidden_size,
                kernel_size=1
            ),
            #self.bn,
            self.activation,
            
            nn.Conv2d(
                in_channels=self.hidden_size,
                out_channels=self.hidden_size,
                kernel_size=(self.kernel_size, 1),
                padding=(self.kernel_size // 2, 0)
            ), 
            #self.bn,
            self.activation,
            
            nn.Conv2d(
                in_channels=self.hidden_size,
                out_channels=self.hidden_size,
                kernel_size=(1, self.kernel_size),
                padding=(0, self.kernel_size // 2),
            ), 
            #self.bn,
            self.activation,
            
            nn.Conv2d(
                in_channels=self.hidden_size,
                out_channels=self.out_channels,
                kernel_size=1
            )
        )
        
    def forward(self, x):
        return self.block(x)
    
    
class ConvBlock(nn.Module):
    def __init__(
        self,
        in_channels,
        out_channels,
        kernel_size,
        stride,
        padding,
        activation
    ):
        """
        Convolution Include Activation & Batch Normalization
        
        :param in_channels: Channel of Input Tensor (int)
        :param out_channels: Channel of Output Tensor (int)
        :param kernel_size: Kernel Size of Convolution Layer (int / tuple)
        :param stride: Stride of Convolution Layer (int / tuple)
        :param padding: Padding Size of Convolution Layer (int / tuple)
        :param activation: Activation Function (torch.nn)
        """
        super(ConvBlock, self).__init__()
        
        self.block = nn.Sequential(
            nn.Conv2d(
                in_channels=in_channels,
                out_channels=out_channels,
                kernel_size=kernel_size,
                stride=stride,
                padding=padding
            ),
            nn.BatchNorm2d(out_channels)
        )
        self.activation = activation
        
    def forward(self, x):
        x = self.block(x)
        
        if self.activation != None:
            return self.activation(x)
        else:
            return x
    
    
class LPRNet(nn.Module):
    def __init__(self, config):
        """
        LPRNet (https://arxiv.org/abs/1806.10447)
        
        :param config:
            config["class_num"]: Num of Class (int)
            config["in_channels"]: Input's Channel Size (int)   default: 3
            config["hidden_size"]: Hidden Size of Convolution Layer (int)   default: 64
            config["kernel_hidden"]: Convolution Layer's Kernel Width (int)   default: 3
            config["kernel_wide"]: Inter Dropout Convolution Layer's Kernel Width (int)   default: 4
            config["kernel_out"]: Final Convolution Layer's Kernel Width (int)   default: 13
            cofing["activation"]: "relu" / "relu6" / "leaky_relu" / "prelu" (str)   default: "relu"
            cofing["dropout"]: Dropout Rate (float)   default: 0.5
        """
        super(LPRNet, self).__init__()
    
        self.class_num     = config["class_num"]
        self.in_channels   = config["in_channels"]
        self.hidden_size   = config["hidden_size"]
        self.kernel_hidden = config["kernel_hidden"]
        self.kernel_wide   = config["kernel_wide"]
        self.kernel_out    = config["kernel_out"]
        self.act_conf      = config["activation"].lower()
        self.dropout       = config["dropout"]
        
        if self.act_conf == "relu":
            self.activation = nn.ReLU()
        elif self.act_conf == "relu6":
            self.activation = nn.ReLU6()
        elif self.act_conf == "leaky_relu":
            self.activation = nn.LeakyReLU(0.2)
        elif self.act_conf == "prelu":
            self.activation = nn.PReLU()
            
        self.backbone = nn.Sequential(
            ConvBlock(
                in_channels=self.in_channels,
                out_channels=self.hidden_size,
                kernel_size=self.kernel_hidden,
                stride=1,
                padding=self.kernel_hidden//2,
                activation=self.activation
            ),
            nn.MaxPool3d(
                kernel_size=(1, 3, 3),
                stride=(1, 1, 1)
            ),
            
            SmallBasicBlock(
                in_channels=self.hidden_size, 
                out_channels=self.hidden_size*2,
                activation=self.activation,
                config=config
            ),
            nn.MaxPool3d(
                kernel_size=(1, 3, 3),
                stride=(2, 1, 2)
            ),
            
            SmallBasicBlock(
                in_channels=self.hidden_size, 
                out_channels=self.hidden_size*4,
                activation=self.activation,
                config=config
            ),
            SmallBasicBlock(
                in_channels=self.hidden_size*4, 
                out_channels=self.hidden_size*4,
                activation=self.activation,
                config=config
            ),
            nn.MaxPool3d(
                kernel_size=(1, 3, 3),
                stride=(4, 1, 2)
            ),
            
            nn.Dropout(self.dropout),
            ConvBlock(
                in_channels=self.hidden_size,
                out_channels=self.hidden_size*4,
                kernel_size=(1, self.kernel_wide),
                stride=1,
                padding=0,
                activation=self.activation
            ),
            nn.Dropout(self.dropout),
            ConvBlock(
                in_channels=self.hidden_size*4,
                out_channels=self.class_num,
                kernel_size=(self.kernel_out, 1),
                stride=1,
                padding=0,
                activation=None
            )
        )
        
        self.backbone.apply(self.init_weights)
        
        
    def init_weights(self, layer):
        init_layer_list = [nn.Linear, nn.Conv2d]
        
        if type(layer) in init_layer_list:
            nn.init.xavier_uniform_(layer.weight)
        
        
    def forward(self, x):
        x = self.backbone(x)
        x = torch.mean(x, dim=2)
        
        return F.log_softmax(x, dim=1)

In [None]:
class KoreanLPDataset(Dataset):
    def __init__(
        self,
        image_list,
        class_dict,
        data_size
    ):
        self.x = image_list
        self.class_dict = class_dict
        self.data_size = data_size
        
        self.labels = self.build_sequences(image_list)
        

    def __len__(self):
        return len(self.labels)
    
    
    def norm_img(self, img):        
        kernel = cv2.getStructuringElement(cv2.MORPH_ELLIPSE, (5,5))
        dilated = cv2.morphologyEx(img, cv2.MORPH_DILATE, kernel)
        
        median = cv2.medianBlur(dilated, 15)
        diff = 255 - cv2.subtract(median, img)

        normed = cv2.normalize(diff, None, 0, 255, cv2.NORM_MINMAX)
        normed = cv2.resize(normed, self.data_size)

        return normed
    
    
    def img_to_tensor(self, img, factor=255.):
        arr = (self.norm_img(np.array(img))) / factor
        
        return torch.tensor(arr, dtype=torch.float32).transpose(0, 2).transpose(1, 2)
    

    def build_sequences(self, image_list):
        labels = list()

        for img in image_list:
            class_seq = os.path.basename(img)[:-4]

            sequence = list()
            for char in class_seq:
                sequence.append(self.class_dict[char])

            labels.append(sequence)

        return np.array(labels)
    

    def __getitem__(self, idx):
        img = Image.open(self.x[idx])
        label = self.labels[idx]
        tgt_len = len(label)
        
        return self.img_to_tensor(img), label, tgt_len

In [None]:
NUMBERS = [
    "<BLANK>",
    '0', '1', '2', '3', '4',
    '5', '6', '7', '8', '9'
]

CHARS = [
    # 자가용
    '가', '나', '다', '라', '마',
    '거', '너', '더', '러', '머', '버', '서', '어', '저', 
    '고', '노', '도', '로', '모', '보', '소', '오', '조',
    '구', '누', '두', '루', '무', '부', '수', '우', '주',
    
    # 사업용
    '바', '사', '아', '자',
    
    # 군용
    '공', '해', '육', '합',
    
    # 기타
    '허', '하', '호', '배'
]

CLASS_DICT = {num:i for i, num in enumerate(NUMBERS + CHARS)}
idx_to_word = {y:x for x,y in CLASS_DICT.items()}

In [None]:
train_imgs = glob.glob("./dataset/train/*")

BATCH_SIZE = 32

ds_train = KoreanLPDataset(
    image_list=train_imgs,
    class_dict=CLASS_DICT,
    data_size=(94, 24)
)
train_loader = DataLoader(ds_train, batch_size=BATCH_SIZE, shuffle=True)

In [None]:
test_imgs = glob.glob("../dataset/test/*.jpg")

ds_test = KoreanLPDataset(
    image_list=data_list,
    class_dict=CLASS_DICT,
    data_size=(94, 24)
)
test_loader = DataLoader(ds_test, batch_size=1, shuffle=False)

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

model_config = {
    "class_num": len(CLASS_DICT),
    "in_channels": 3,
    "hidden_size": 64,
    "kernel_hidden": 3,
    "kernel_wide": 4,
    "kernel_out": 13,
    "sbb_factor": 4,
    "sbb_kernel_size": 5,
    "activation": "relu",
    "dropout": 0.5
}

model = LPRNet(model_config)

K = 1000
optimizer = Adam(model.parameters(), lr=0.001)
scheduler = lr_scheduler.StepLR(
    optimizer=optimizer,
    step_size=100*K,
    gamma=0.1
)
loss_fn = nn.CTCLoss()

if torch.cuda.is_available():
    model = model.cuda()
    loss_fn = loss_fn.cuda()

print("Trainable Parameters:", count_parameters(model))
print(model)

In [None]:
def ctc_decoding(logits_batch, idx_to_word):
    logits_batch = logits_batch.transpose(1, 2)

    results = list()
    for logits in logits_batch:
        result = list()
        tmp = 0

        for _class in torch.argmax(logits, dim=-1).cpu().detach().numpy():

            if tmp == _class: continue

            if _class == 0: 
                tmp = 0
                continue # Blank

            else:
                result.append(idx_to_word[_class])
                tmp = _class

        results.append(result)

    return np.array(results)

In [None]:
best_score = 0.0

def evaluation(model, _iter, idx_to_word):
    global best_score
    accu = 0.0

    for batch_idx, batch in enumerate(_iter):
        lps, labels, tgt_len = batch

        with torch.no_grad():
            lps = lps.cuda()
            labels = labels.cuda()
            tgt_len = tgt_len.cuda()

            logits = model(lps)

            gold = [idx_to_word[c] for c in labels[0].cpu().detach().numpy()]
            pred = ctc_decoding(logits, idx_to_word)[0]
            
            score = 0
            for idx in range(7):
                if idx >= len(pred): continue
                if gold[idx] == str(pred[idx]): score += 1

            accu += score / 7
            print("\rAccuracy %.4lf" % ((accu / (batch_idx + 1))), end="")
                
    if best_score < (accu / (batch_idx + 1)):
        best_score = (accu / (batch_idx + 1))
        print("\nBest Score! %.4lf" % best_score)
        torch.save(model.state_dict(), "./lprnet.pth")
                     
    print("\nGold:", gold)
    print("Pred:", pred[:10])

In [None]:
EPOCHS = 1000
iteration = 0
history = list()

print(">> Training Start:", datetime.datetime.now())

for epoch in range(EPOCHS):
    model.train()
    start_time = datetime.datetime.now()
    batch_loss = 0.0
    print(">> Current Learning Rate:", scheduler.get_last_lr())

    for batch_idx, batch in enumerate(train_loader):
        lps, labels, tgt_len = batch
        optimizer.zero_grad()

        if torch.cuda.is_available():
            lps = lps.cuda()
            labels = labels.cuda()
            tgt_len = tgt_len.cuda()

        logits = model(lps)
        logits = logits.transpose(0, 2).transpose(1, 2)
        
        src_len = torch.full(
            size=(lps.shape[0],),
            fill_value=logits.shape[0],
            dtype=torch.long
        ).cuda()

        loss = loss_fn(logits, labels, src_len, tgt_len)
        batch_loss += loss.item()

        loss.backward()
        optimizer.step()
        scheduler.step()
        
        iteration += 1
        
        print("\r>> Epoch: %4d | Iteration: %6d | Process: %4d / %4d | Loss: %.4lf" % \
              (epoch+1, iteration, batch_idx+1, train_loader.__len__(), (batch_loss / (batch_idx + 1))), end="")
    
    history.append((epoch+1, (batch_loss / (batch_idx + 1))))
    
    end_time = datetime.datetime.now()
    print("\n>> Time / Epoch: ", end_time - start_time)
    
    model.eval()
    evaluation(model, test_loader)
    print("")