In [1]:
#!pip install easyocr
#!git clone https://github.com/clovaai/deep-text-recognition-benchmark.git
# !pip install natsort

In [2]:
# TODO : augmentation, auto tune , or just use easyorc

In [3]:
import sys
sys.path.append('./deep-text-recognition-benchmark/')

In [4]:
# from model import Model
from dataset import hierarchical_dataset, AlignCollate, Batch_Balanced_Dataset
from utils import (
    CTCLabelConverter,
    CTCLabelConverterForBaiduWarpctc,
    AttnLabelConverter,
    Averager,
)
import numpy as np
import torch.utils.data
import torch.optim as optim
import torch.nn.init as init
import torch.backends.cudnn as cudnn
import torch
import argparse
import string
import random
import time
import os
from sklearn.metrics import *

# from test import validation

In [5]:
import random
import pandas as pd
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from torchvision.models import resnet18, resnet34, resnet50, resnet101
from torchvision import transforms

from tqdm.auto import tqdm
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

import warnings
warnings.filterwarnings(action='ignore')

In [6]:
device = torch.device(
    'cuda') if torch.cuda.is_available() else torch.device('cpu')

In [7]:
CFG = {
    'IMG_HEIGHT_SIZE': 64,
    'IMG_WIDTH_SIZE': 256,
    'EPOCHS': 50,
    'LEARNING_RATE': 1e-3,
    'BATCH_SIZE': 180,
    'NUM_WORKERS': 12,  # 본인의 GPU, CPU 환경에 맞게 설정
    'SEED': 41,
    'INPUT_CHANNEL': 3,
    'HIDDEN_N': 64
}

In [8]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True


seed_everything(CFG['SEED'])  # Seed 고정

In [9]:
df = pd.read_csv('./train.csv')

In [10]:
# 제공된 학습데이터 중 1글자 샘플들의 단어사전이 학습/테스트 데이터의 모든 글자를 담고 있으므로 학습 데이터로 우선 배치
df['len'] = df['label'].str.len()
train_v1 = df[df['len'] == 1]

In [11]:
# 제공된 학습데이터 중 2글자 이상의 샘플들에 대해서 단어길이를 고려하여 Train (80%) / Validation (20%) 분할
df = df[df['len'] > 1]
train_v2, val, _, _ = train_test_split(
    df, df['len'], test_size=0.2, random_state=CFG['SEED'])

In [12]:
# 학습 데이터로 우선 배치한 1글자 샘플들과 분할된 2글자 이상의 학습 샘플을 concat하여 최종 학습 데이터로 사용
train = pd.concat([train_v1, train_v2])
print(len(train), len(val))

66251 10637


In [13]:
# 학습 데이터로부터 단어 사전(Vocabulary) 구축
train_gt = [gt for gt in train['label']]
train_gt = "".join(train_gt)
letters = sorted(list(set(list(train_gt))))
print(len(letters))

2349


In [14]:
train_gt = [gt for gt in train['label']]
train_gt = "".join(train_gt)

In [15]:
letters = sorted(list(set(list(train_gt))))

In [16]:
vocabulary = ["-"] + letters

In [17]:
idx2char = {k: v for k, v in enumerate(vocabulary, start=0)}
char2idx = {v: k for k, v in idx2char.items()}

In [18]:
vocabulary = ["-"] + letters
print(len(vocabulary))
idx2char = {k: v for k, v in enumerate(vocabulary, start=0)}
char2idx = {v: k for k, v in idx2char.items()}

2350


In [19]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, train_mode=True):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.train_mode = train_mode

    def __len__(self):
        return len(self.img_path_list)

    def __getitem__(self, index):
        image = Image.open(self.img_path_list[index]).convert('RGB')

        if self.train_mode:
            image = self.train_transform(image)
        else:
            image = self.test_transform(image)

        if self.label_list is not None:
            text = self.label_list[index]
            return image, text
        else:
            return image

    # Image Augmentation
    def train_transform(self, image):
        transform_ops = transforms.Compose([
            transforms.Resize((CFG['IMG_HEIGHT_SIZE'], CFG['IMG_WIDTH_SIZE'])),
            transforms.ToTensor(),
            transforms.Normalize(mean=(0.485, 0.456, 0.406),
                                 std=(0.229, 0.224, 0.225))
        ])
        return transform_ops(image)

    def test_transform(self, image):
        transform_ops = transforms.Compose([
            transforms.Resize((CFG['IMG_HEIGHT_SIZE'], CFG['IMG_WIDTH_SIZE'])),
            transforms.ToTensor(),
            transforms.Normalize(mean=(0.485, 0.456, 0.406),
                                 std=(0.229, 0.224, 0.225))
        ])
        return transform_ops(image)

In [20]:
train_dataset = CustomDataset(train['img_path'].values, train['label'].values)
train_loader = DataLoader(
    train_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True, num_workers=CFG['NUM_WORKERS'])

val_dataset = CustomDataset(val['img_path'].values, val['label'].values)
val_loader = DataLoader(
    val_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=True, num_workers=CFG['NUM_WORKERS'])

In [21]:
# image_batch, text_batch = iter(train_loader).next()
# print(image_batch.size(), text_batch)

In [22]:
import torch.nn as nn

from modules.transformation import TPS_SpatialTransformerNetwork
from modules.feature_extraction import VGG_FeatureExtractor, RCNN_FeatureExtractor, ResNet_FeatureExtractor
from modules.sequence_modeling import BidirectionalLSTM
from modules.prediction import Attention


class Model(nn.Module):

    def __init__(self, num_class=len(char2idx)):
        super(Model, self).__init__()
        rnn_hidden_size = 256
        self.Transformation = TPS_SpatialTransformerNetwork(
            F=20, I_size=(CFG['IMG_HEIGHT_SIZE'], CFG['IMG_WIDTH_SIZE']), I_r_size=(CFG['IMG_HEIGHT_SIZE'], CFG['IMG_WIDTH_SIZE']), I_channel_num=CFG['INPUT_CHANNEL'])

        """ FeatureExtraction """

        resnet = resnet34(pretrained=True)
        # CNN Feature Extract
        resnet_modules = list(resnet.children())[:-3]
        self.feature_extract = nn.Sequential(
            *resnet_modules,
            nn.Conv2d(256, 256, kernel_size=(3, 6), stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(inplace=True)
        )
        self.linear1 = nn.Linear(1024, rnn_hidden_size)

#         self.FeatureExtraction = ResNet_FeatureExtractor(
#             CFG['INPUT_CHANNEL'], CFG['HIDDEN_N'])
#         self.FeatureExtraction_output = CFG['HIDDEN_N']
#         self.AdaptiveAvgPool = nn.AdaptiveAvgPool2d(
#             (None, 1))  # Transform final (imgH/16-1) -> 1

        self.rnn = nn.RNN(input_size=rnn_hidden_size,
                          hidden_size=rnn_hidden_size,
                          bidirectional=True,
                          batch_first=True)
        self.linear2 = nn.Linear(rnn_hidden_size, num_class)

        """ Sequence modeling"""
        self.SequenceModeling = nn.Sequential(
            #             BidirectionalLSTM(self.FeatureExtraction_output,
            #                               rnn_hidden_size, rnn_hidden_size),
            BidirectionalLSTM(rnn_hidden_size, rnn_hidden_size, rnn_hidden_size))
        self.SequenceModeling_output = CFG['HIDDEN_N']

        """ Prediction """
        self.Prediction = nn.Linear(self.SequenceModeling_output, num_class)

    def forward(self, x):
        """ Transformation stage """
        input = self.Transformation(x)

        """ Feature extraction stage """
        visual_feature = self.feature_extract(input)
        visual_feature = visual_feature.permute(0, 3, 1, 2)

        ###
        batch_size = visual_feature.size(0)
        T = visual_feature.size(1)
        # [batch_size, T==width, num_features==channels*height]
        visual_feature = visual_feature.view(batch_size, T, -1)
        visual_feature = self.linear1(visual_feature)


#         visual_feature = self.AdaptiveAvgPool(
#             visual_feature.permute(0, 3, 1, 2))  # [b, c, h, w] -> [b, w, c, h]
#         visual_feature = visual_feature.squeeze(3)

        """ Sequence modeling stage """
        contextual_feature = self.SequenceModeling(visual_feature)

        """ Prediction stage """

        output = self.linear2(contextual_feature)
        # [T==10, batch_size, num_classes==num_features]
        prediction = output.permute(1, 0, 2)

#         prediction = self.Prediction(contextual_feature.contiguous())
#         prediction = prediction.permute(1, 0, 2)

        return prediction

In [23]:
criterion = nn.CTCLoss(blank=0)  # idx 0 : '-'

In [24]:
def encode_text_batch(text_batch):
    text_batch_targets_lens = [len(text) for text in text_batch]
    text_batch_targets_lens = torch.IntTensor(text_batch_targets_lens)

    text_batch_concat = "".join(text_batch)
    text_batch_targets = [char2idx[c] for c in text_batch_concat]
    text_batch_targets = torch.IntTensor(text_batch_targets)

    return text_batch_targets, text_batch_targets_lens

In [25]:
def compute_loss(text_batch, text_batch_logits):
    """
    text_batch: list of strings of length equal to batch size
    text_batch_logits: Tensor of size([T, batch_size, num_classes])
    """
    text_batch_logps = F.log_softmax(
        text_batch_logits, 2)  # [T, batch_size, num_classes]
    text_batch_logps_lens = torch.full(size=(text_batch_logps.size(1),),
                                       fill_value=text_batch_logps.size(0),
                                       dtype=torch.int32).to(device)  # [batch_size]

    text_batch_targets, text_batch_targets_lens = encode_text_batch(text_batch)
    loss = criterion(text_batch_logps, text_batch_targets,
                     text_batch_logps_lens, text_batch_targets_lens)

    return loss

In [38]:
def decode_predictions(text_batch_logits):
    text_batch_tokens = F.softmax(
        text_batch_logits, 2).argmax(2)  # [T, batch_size]
    text_batch_tokens = text_batch_tokens.numpy().T  # [batch_size, T]

    text_batch_tokens_new = []
    for text_tokens in text_batch_tokens:
        text = [idx2char[idx] for idx in text_tokens]
        text = "".join(text)
        text_batch_tokens_new.append(text)

    return text_batch_tokens_new

In [48]:
def train(model, optimizer, train_loader, val_loader, scheduler, device):
    model.to(device)

    best_loss = 999999
    best_model = None
    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for image_batch, text_batch in tqdm(iter(train_loader)):
            image_batch = image_batch.to(device)

#             optimizer.zero_grad()
#             text_batch_logits = model(image_batch)
#             loss = compute_loss(text_batch, text_batch_logits)
#             loss.backward()
#             optimizer.step()
#             train_loss.append(loss.item())

#         _train_loss = np.mean(train_loss)
        _val_loss = validation(model, val_loader, device)
        print(
            f'Epoch : [{epoch}] Train CTC Loss : [{_train_loss:.5f}] Val CTC Loss : [{_val_loss:.5f}]')

        if scheduler is not None:
            scheduler.step(_val_loss)

        if best_loss > _val_loss:
            best_loss = _val_loss
            best_model = model
            torch.save(model.state_dict(), './model_2_best.pth')

    return best_model

In [None]:
#     preds = []
#     with torch.no_grad():
#         for image_batch in tqdm(iter(test_loader)):
#             image_batch = image_batch.to(device)

#             text_batch_logits = model(image_batch)

#             text_batch_pred = decode_predictions(text_batch_logits.cpu())

#             preds.extend(text_batch_pred)
#     return preds

In [56]:
def validation(model, val_loader, device):
    model.eval()
    val_loss = []
    preds = []
    labels = []
    acc_cnt = 0
    with torch.no_grad():
        for image_batch, text_batch in tqdm(iter(val_loader)):
            image_batch = image_batch.to(device)

            text_batch_logits = model(image_batch)
            loss = compute_loss(text_batch, text_batch_logits)
            text_batch_pred = decode_predictions(text_batch_logits.cpu())
            val_loss.append(loss.item())
            preds.append(text_batch_pred)
            labels.append(list(text_batch))

    _val_loss = np.mean(val_loss)
    batch_num = len(preds)
    for idx in range(batch_num):
        if pred[idx] == labels[idx]:
            acc_cnt += 1.0
    acc = float(acc_cnt/batch_num)
    print(f'ACC : {acc:.5f}')

    return _val_loss

SyntaxError: invalid syntax (2093683311.py, line 18)

In [55]:
# model = RecognitionModel()

# checkpoint = torch.load('./model_2_best.pth')
model = Model(len(char2idx))
# model.load_state_dict(checkpoint)
# model.load_state_dict(checkpoint, strict=False)

# model.eval()

optimizer = torch.optim.Adam(
    params=model.parameters(), lr=CFG["LEARNING_RATE"])

# optimizer = torch.optim.Adadelta(
#     params=model.parameters(), lr=CFG["LEARNING_RATE"], rho=0.95, eps=1e-8)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer, mode='min', factor=0.5, patience=2, threshold_mode='abs', min_lr=1e-8, verbose=True)

infer_model = train(model, optimizer, train_loader,
                    val_loader, scheduler, device)

  0%|          | 0/415 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
torch.save(model.state_dict(), './model_2_best.pth')

In [40]:
test = pd.read_csv('./test.csv')

In [41]:
test_dataset = CustomDataset(test['img_path'].values, None)
test_loader = DataLoader(
    test_dataset, batch_size=CFG['BATCH_SIZE'], shuffle=False, num_workers=CFG['NUM_WORKERS'])

In [42]:
def inference(model, test_loader, device):
    model.eval()
    preds = []
    with torch.no_grad():
        for image_batch in tqdm(iter(test_loader)):
            image_batch = image_batch.to(device)

            text_batch_logits = model(image_batch)

            text_batch_pred = decode_predictions(text_batch_logits.cpu())

            preds.extend(text_batch_pred)
    return preds

In [43]:
predictions = inference(model, test_loader, device)

  0%|          | 0/464 [00:00<?, ?it/s]

In [44]:
# 샘플 별 추론결과를 독립적으로 후처리
def remove_duplicates(text):
    if len(text) > 1:
        letters = [text[0]] + [letter for idx,
                               letter in enumerate(text[1:], start=1) if text[idx] != text[idx-1]]
    elif len(text) == 1:
        letters = [text[0]]
    else:
        return ""
    return "".join(letters)


def correct_prediction(word):
    parts = word.split("-")
    parts = [remove_duplicates(part) for part in parts]
    corrected_word = "".join(parts)
    return corrected_word

In [45]:
submit = pd.read_csv('./sample_submission.csv')
submit['label'] = predictions
submit['label'] = submit['label'].apply(correct_prediction)

In [46]:
submit.to_csv('./submission_rev.csv', index=False)