## Import

In [4]:
import random
import pandas as pd
import numpy as np
import os
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from torchvision.models import resnet50
from torchvision.models import resnet18
from torchvision import transforms

from tqdm.auto import tqdm
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split

import warnings
warnings.filterwarnings(action='ignore') 

In [5]:
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

## Hyperparameter Setting

In [6]:
CFG = {
    'IMG_HEIGHT_SIZE':64,
    'IMG_WIDTH_SIZE':224,
    'EPOCHS':10,
    'LEARNING_RATE':1e-3,
    'BATCH_SIZE':256,
    'NUM_WORKERS':0, # 본인의 GPU, CPU 환경에 맞게 설정
    'SEED':41
}

## Fixed RandomSeed

In [7]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(CFG['SEED']) # Seed 고정

## Data Load & Train/Validation Split

In [8]:
df = pd.read_csv('../train.csv')

In [9]:
# 제공된 학습데이터 중 1글자 샘플들의 단어사전이 학습/테스트 데이터의 모든 글자를 담고 있으므로 학습 데이터로 우선 배치
df['len'] = df['label'].str.len()
train_v1 = df[df['len']==1]

In [10]:
# 제공된 학습데이터 중 2글자 이상의 샘플들에 대해서 단어길이를 고려하여 Train (80%) / Validation (20%) 분할
df = df[df['len']>1]
train_v2, val, _, _ = train_test_split(df, df['len'], test_size=0.2, random_state=CFG['SEED'])

In [11]:
# 학습 데이터로 우선 배치한 1글자 샘플들과 분할된 2글자 이상의 학습 샘플을 concat하여 최종 학습 데이터로 사용
train = pd.concat([train_v1, train_v2])
print(len(train), len(val))

66251 10637


In [12]:
default_path = "C:/Users/Jo/PYDATAexam/train/" 
train = pd.read_csv('C:/Users/Jo/PYDATAexam/train.csv')

df['img_path'] = train['img_path'].apply(lambda x: default_path + x.split('/')[2])

In [13]:
df

Unnamed: 0,id,img_path,label,len
0,TRAIN_00000,C:/Users/Jo/PYDATAexam/train/TRAIN_00000.png,빨간색,3
2,TRAIN_00002,C:/Users/Jo/PYDATAexam/train/TRAIN_00002.png,차차,2
4,TRAIN_00004,C:/Users/Jo/PYDATAexam/train/TRAIN_00004.png,놓치다,3
5,TRAIN_00005,C:/Users/Jo/PYDATAexam/train/TRAIN_00005.png,오래도록,4
6,TRAIN_00006,C:/Users/Jo/PYDATAexam/train/TRAIN_00006.png,유월,2
...,...,...,...,...
76881,TRAIN_76881,C:/Users/Jo/PYDATAexam/train/TRAIN_76881.png,구분하다,4
76882,TRAIN_76882,C:/Users/Jo/PYDATAexam/train/TRAIN_76882.png,하나하나,4
76884,TRAIN_76884,C:/Users/Jo/PYDATAexam/train/TRAIN_76884.png,겪다,2
76885,TRAIN_76885,C:/Users/Jo/PYDATAexam/train/TRAIN_76885.png,벨트,2


In [10]:
val['img_path'] = val['img_path'].apply(lambda x: default_path + x.split('/')[2])

In [11]:
default_path = "C:/Users/Jo/PYDATAexam/test/" 
test = pd.read_csv("C:/Users/Jo/PYDATAexam/test.csv")

test['img_path'] = test['img_path'].apply(lambda x: default_path + x.split('/')[2])

## Get Vocabulary

In [12]:
# 학습 데이터로부터 단어 사전(Vocabulary) 구축
train_gt = [gt for gt in df['label']]
train_gt = "".join(train_gt)
letters = sorted(list(set(list(train_gt))))
print(len(letters))

805


In [13]:
vocabulary = ["-"] + letters
print(len(vocabulary))
idx2char = {k:v for k,v in enumerate(vocabulary, start=0)}
char2idx = {v:k for k,v in idx2char.items()}

806


## CustomDataset

In [14]:
class CustomDataset(Dataset):
    def __init__(self, img_path_list, label_list, train_mode=True):
        self.img_path_list = img_path_list
        self.label_list = label_list
        self.train_mode = train_mode
        
    def __len__(self):
        return len(self.img_path_list)
    
    def __getitem__(self, index):
        image = Image.open(self.img_path_list[index]).convert('RGB')
        
        if self.train_mode:
            image = self.train_transform(image)
        else:
            image = self.test_transform(image)
            
        if self.label_list is not None:
            text = self.label_list[index]
            return image, text
        else:
            return image
    
    # Image Augmentation
    def train_transform(self, image):
        transform_ops = transforms.Compose([
            transforms.Resize((CFG['IMG_HEIGHT_SIZE'],CFG['IMG_WIDTH_SIZE'])),
            transforms.ToTensor(),
            transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
        ])
        return transform_ops(image)
    
    def test_transform(self, image):
        transform_ops = transforms.Compose([
            transforms.Resize((CFG['IMG_HEIGHT_SIZE'],CFG['IMG_WIDTH_SIZE'])),
            transforms.ToTensor(),
            transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225))
        ])
        return transform_ops(image)

In [15]:
train_dataset = CustomDataset(df['img_path'].values, df['label'].values)

train_loader = DataLoader(train_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

val_dataset = CustomDataset(val['img_path'].values, val['label'].values)

val_loader = DataLoader(val_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=True, num_workers=0)

In [16]:
df.head()

Unnamed: 0,id,img_path,label,len
0,TRAIN_00000,C:/Users/Jo/PYDATAexam/train/TRAIN_00000.png,빨간색,3
2,TRAIN_00002,C:/Users/Jo/PYDATAexam/train/TRAIN_00002.png,차차,2
4,TRAIN_00004,C:/Users/Jo/PYDATAexam/train/TRAIN_00004.png,놓치다,3
5,TRAIN_00005,C:/Users/Jo/PYDATAexam/train/TRAIN_00005.png,오래도록,4
6,TRAIN_00006,C:/Users/Jo/PYDATAexam/train/TRAIN_00006.png,유월,2


In [17]:
image_batch, text_batch = next(iter(train_loader))
print(image_batch.size(), text_batch)

torch.Size([256, 3, 64, 224]) ('치료', '회장', '부정', '길이', '매일', '생선', '잔디밭', '작은딸', '완성하다', '고맙다', '부잣집', '놔두다', '이웃', '첫째', '소비자', '방문', '기혼', '노동자', '미용실', '인제', '축구공', '주름살', '전개되다', '개월', '원서', '위험하다', '어쩌다', '식생활', '사물', '분홍색', '버리다', '축구장', '담당하다', '뒤늦다', '예매하다', '찍다', '자장면', '정해지다', '아무', '자장면', '어색하다', '년생', '백색', '흥분하다', '기획', '이웃집', '자극하다', '두부', '와인', '위험하다', '그야말로', '일하다', '진행되다', '여행', '이렇다', '뜻대로', '본격적', '생신', '다음', '변호사', '특별하다', '정신적', '말씀', '바깥', '재미', '미술관', '초보자', '연기되다', '의미하다', '서울', '쓰다듬다', '넘어뜨리다', '걸리다', '빗방울', '상표', '힘들어하다', '절약', '심각해지다', '시골', '레이저', '통합', '일찍이', '넘어뜨리다', '일주일', '당연하다', '유명하다', '년생', '앞서다', '여군', '바닥', '특이하다', '정리되다', '상대', '중단', '바퀴', '모범', '어렵다', '방울', '내지', '위로', '다듬다', '저축', '열차', '드디어', '불법', '적극적', '이사', '힘들다', '참석하다', '장난', '조미료', '붙이다', '구하다', '사립', '구별되다', '아하', '선진국', '고집하다', '라디오', '올바르다', '능력', '이해', '한국적', '연관', '입장', '태우다', '거기', '소형', '나흘', '간단히', '학번', '삼계탕', '인구', '한꺼번에', '쓰다', '끊임없이', '꾸다', '묻히다', '부족', '노력', '짐작하다', '소매', '간장

## Model Define

In [18]:
from efficientnet_pytorch import EfficientNet

class RecognitionModel(nn.Module):
    def __init__(self, num_chars=len(char2idx), rnn_hidden_size=256):
        super(RecognitionModel, self).__init__()
        self.num_chars = num_chars
        self.rnn_hidden_size = rnn_hidden_size
        model = EfficientNet.from_pretrained('efficientnet-b0')
        # CNN Backbone = 사전
        #학습된 resnet18 활용
        # https://arxiv.org/abs/1512.03385
        #resnet = resnet18(pretrained=True)
        # CNN Feature Extract
        one=list(model.children())[0]
        two=list(model.children())[1]
        three=list(model.children())[2]
        four=list(model.children())[3:]
        self.feature_extract = nn.Sequential(
            one,
            two,
            *three
            #nn.Conv2d(256, 256, kernel_size=(3,6), stride=1, padding=1),
            #nn.BatchNorm2d(256),
            #nn.ReLU(inplace=True)
            
        )

        self.linear1 = nn.Linear(640, rnn_hidden_size)
        
        # RNN
        self.rnn = nn.RNN(input_size=rnn_hidden_size, 
                            hidden_size=rnn_hidden_size,
                            bidirectional=True, 
                            batch_first=True)
        self.linear2 = nn.Linear(self.rnn_hidden_size*2, num_chars)
        
        
    def forward(self, x):
        # CNN
        x = self.feature_extract(x) # [batch_size, channels, height, width]
        x = x.permute(0, 3, 1, 2) # [batch_size, width, channels, height]
        batch_size = x.size(0)
        T = x.size(1)
        x = x.view(batch_size, T, -1) # [batch_size, T==width, num_features==channels*height]
        #print(x.size())
        x = self.linear1(x)
        #print(x.size())
        # RNN
        x, hidden = self.rnn(x)
        
        output = self.linear2(x)
        output = output.permute(1, 0, 2) # [T==10, batch_size, num_classes==num_features]
        
        return output

## Define CTC Loss

In [19]:
criterion = nn.CTCLoss(blank=0) # idx 0 : '-'

In [20]:
def encode_text_batch(text_batch):
    text_batch_targets_lens = [len(text) for text in text_batch]
    text_batch_targets_lens = torch.IntTensor(text_batch_targets_lens)
    
    text_batch_concat = "".join(text_batch)
    text_batch_targets = [char2idx[c] for c in text_batch_concat]
    text_batch_targets = torch.IntTensor(text_batch_targets)
    
    return text_batch_targets, text_batch_targets_lens

In [21]:
def compute_loss(text_batch, text_batch_logits):
    """
    text_batch: list of strings of length equal to batch size
    text_batch_logits: Tensor of size([T, batch_size, num_classes])
    """
    text_batch_logps = F.log_softmax(text_batch_logits, 2) # [T, batch_size, num_classes]  
    text_batch_logps_lens = torch.full(size=(text_batch_logps.size(1),), 
                                       fill_value=text_batch_logps.size(0), 
                                       dtype=torch.int32).to(device) # [batch_size] 

    text_batch_targets, text_batch_targets_lens = encode_text_batch(text_batch)
    loss = criterion(text_batch_logps, text_batch_targets, text_batch_logps_lens, text_batch_targets_lens)

    return loss

## Train

In [22]:
def train(model, optimizer, train_loader, val_loader, scheduler, device):
    model.to(device)
    
    best_loss = 999999
    best_model = None
    for epoch in range(1, CFG['EPOCHS']+1):
        model.train()
        train_loss = []
        for image_batch, text_batch in tqdm(iter(train_loader)):
            image_batch = image_batch.to(device)
            
            optimizer.zero_grad()
            
            text_batch_logits = model(image_batch)
            loss = compute_loss(text_batch, text_batch_logits)
            
            loss.backward()
            optimizer.step()
            
            train_loss.append(loss.item())
        
        _train_loss = np.mean(train_loss)
        
        _val_loss = validation(model, val_loader, device)
        print(f'Epoch : [{epoch}] Train CTC Loss : [{_train_loss:.5f}] Val CTC Loss : [{_val_loss:.5f}]')
        
        if scheduler is not None:
            scheduler.step(_val_loss)
        
        if best_loss > _val_loss:
            best_loss = _val_loss
            best_model = model
    
    return best_model

## Validation

In [23]:
def validation(model, val_loader, device):
    model.eval()
    val_loss = []
    with torch.no_grad():
        for image_batch, text_batch in tqdm(iter(val_loader)):
            image_batch = image_batch.to(device)
            
            text_batch_logits = model(image_batch)
            loss = compute_loss(text_batch, text_batch_logits)
            
            val_loss.append(loss.item())
    
    _val_loss = np.mean(val_loss)
    return _val_loss

## Run!!

In [24]:
#model = RecognitionModel()
model = torch.load("./model.pt")
model.eval()
optimizer = torch.optim.Adam(params = model.parameters(), lr = CFG["LEARNING_RATE"])
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=2,threshold_mode='abs',min_lr=1e-8, verbose=True)

infer_model = train(model, optimizer, train_loader, val_loader, scheduler, device)

  0%|          | 0/208 [00:00<?, ?it/s]

  0%|          | 0/42 [00:00<?, ?it/s]

Epoch : [1] Train CTC Loss : [0.04241] Val CTC Loss : [0.05851]


  0%|          | 0/208 [00:00<?, ?it/s]

  0%|          | 0/42 [00:00<?, ?it/s]

Epoch : [2] Train CTC Loss : [0.03973] Val CTC Loss : [0.06129]


  0%|          | 0/208 [00:00<?, ?it/s]

  0%|          | 0/42 [00:00<?, ?it/s]

Epoch : [3] Train CTC Loss : [0.03386] Val CTC Loss : [0.04862]


  0%|          | 0/208 [00:00<?, ?it/s]

  0%|          | 0/42 [00:00<?, ?it/s]

Epoch : [4] Train CTC Loss : [0.02771] Val CTC Loss : [0.02637]


  0%|          | 0/208 [00:00<?, ?it/s]

  0%|          | 0/42 [00:00<?, ?it/s]

Epoch : [5] Train CTC Loss : [0.03095] Val CTC Loss : [0.03425]


  0%|          | 0/208 [00:00<?, ?it/s]

  0%|          | 0/42 [00:00<?, ?it/s]

Epoch : [6] Train CTC Loss : [0.03143] Val CTC Loss : [0.04376]


  0%|          | 0/208 [00:00<?, ?it/s]

  0%|          | 0/42 [00:00<?, ?it/s]

Epoch : [7] Train CTC Loss : [0.02663] Val CTC Loss : [0.02429]


  0%|          | 0/208 [00:00<?, ?it/s]

  0%|          | 0/42 [00:00<?, ?it/s]

Epoch : [8] Train CTC Loss : [0.02337] Val CTC Loss : [0.05572]


  0%|          | 0/208 [00:00<?, ?it/s]

  0%|          | 0/42 [00:00<?, ?it/s]

Epoch : [9] Train CTC Loss : [0.02861] Val CTC Loss : [0.04019]


  0%|          | 0/208 [00:00<?, ?it/s]

  0%|          | 0/42 [00:00<?, ?it/s]

Epoch : [10] Train CTC Loss : [0.03298] Val CTC Loss : [0.02874]
Epoch 00010: reducing learning rate of group 0 to 5.0000e-04.


## Inference

In [25]:
test = pd.read_csv('../test.csv')

In [26]:
default_path = "C:/Users/Jo/PYDATAexam/test/" 
test = pd.read_csv("C:/Users/Jo/PYDATAexam/test.csv")

test['img_path'] = test['img_path'].apply(lambda x: default_path + x.split('/')[2])

In [27]:
test_dataset = CustomDataset(test['img_path'].values, None, False)
test_loader = DataLoader(test_dataset, batch_size = CFG['BATCH_SIZE'], shuffle=False, num_workers=CFG['NUM_WORKERS'])

In [28]:
def decode_predictions(text_batch_logits):
    text_batch_tokens = F.softmax(text_batch_logits, 2).argmax(2) # [T, batch_size]
    text_batch_tokens = text_batch_tokens.numpy().T # [batch_size, T]

    text_batch_tokens_new = []
    for text_tokens in text_batch_tokens:
        text = [idx2char[idx] for idx in text_tokens]
        text = "".join(text)
        text_batch_tokens_new.append(text)

    return text_batch_tokens_new

def inference(model, test_loader, device):
    model.eval()
    preds = []
    with torch.no_grad():
        for image_batch in tqdm(iter(test_loader)):
            image_batch = image_batch.to(device)
            
            text_batch_logits = model(image_batch)
            
            text_batch_pred = decode_predictions(text_batch_logits.cpu())
            
            preds.extend(text_batch_pred)
    return preds

In [29]:
predictions = inference(infer_model, test_loader, device)

  0%|          | 0/290 [00:00<?, ?it/s]

## Submission

In [30]:
# 샘플 별 추론결과를 독립적으로 후처리
def remove_duplicates(text):
    if len(text) > 1:
        letters = [text[0]] + [letter for idx, letter in enumerate(text[1:], start=1) if text[idx] != text[idx-1]]
    elif len(text) == 1:
        letters = [text[0]]
    else:
        return ""
    return "".join(letters)

def correct_prediction(word):
    parts = word.split("-")
    parts = [remove_duplicates(part) for part in parts]
    corrected_word = "".join(parts)
    return corrected_word

In [31]:
submit = pd.read_csv('../sample_submission.csv')
submit['label'] = predictions
submit['label'] = submit['label'].apply(correct_prediction)

In [32]:
submit.to_csv('./submission.csv', index=False)