In [1]:
!pip install pytorch_lightning
!pip install timm
!pip install einops

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


## 라이브러리 호출

학습에 필요한 라이브러리르 호출한다.

In [2]:
import timm
import random
import os

import pandas as pd
import numpy as np

from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import pytorch_lightning as pl

from torch.utils.data import Dataset, DataLoader
from torchvision.models import resnet18
from torchvision import transforms

import albumentations as A
import albumentations.pytorch

from tqdm.auto import tqdm
from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedKFold

from einops import rearrange, reduce, repeat

import gc

import warnings
warnings.filterwarnings(action='ignore') 

## 구글 드라이브 연결

구글 코랩에서 학습을 수행하기 위해서는 기본적으로 코랩에 연결시켜야 한다. 아래와 같이 수행하면 된다.

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [4]:
!unzip "/content/drive/MyDrive/Colab Notebooks/DACON/2023_교원그룹_AI_챌린지/datasets/open.zip"

Archive:  /content/drive/MyDrive/Colab Notebooks/DACON/2023_교원그룹_AI_챌린지/datasets/open.zip
replace sample_submission.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: N


## 시드값 고정

아래와 같이 시드값을 고정해야지 매번 학습할 때 마다 동일한 결과를 얻을 수 있다. 시드 값은 하고 싶은 숫자를 하면 되고 저는 생일로 하였습니다 😊.

In [5]:
def seed_everything(seed):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = True

seed_everything(428)

## 학습 데이터를 불러옴

수정

In [6]:
train_df = pd.read_csv('train.csv')
test_df = pd.read_csv('test.csv')

In [7]:
# 학습 데이터로부터 단어 사전(Vocabulary) 구축
train_gt = [gt for gt in train_df['label']]
train_gt = "".join(train_gt)
letters = sorted(list(set(list(train_gt))))
print(len(letters))

2349


In [8]:
vocabulary = ["-"] + letters
print(len(vocabulary))
idx2char = {k:v for k,v in enumerate(vocabulary, start=0)}
char2idx = {v:k for k,v in idx2char.items()}

2350


In [9]:
train_df.head()

Unnamed: 0,id,img_path,label
0,TRAIN_00000,./train/TRAIN_00000.png,빨간색
1,TRAIN_00001,./train/TRAIN_00001.png,머
2,TRAIN_00002,./train/TRAIN_00002.png,차차
3,TRAIN_00003,./train/TRAIN_00003.png,써
4,TRAIN_00004,./train/TRAIN_00004.png,놓치다


In [10]:
train_df['label_len'] = train_df['label'].apply(lambda x: len(x))

In [11]:
train_df.head()

Unnamed: 0,id,img_path,label,label_len
0,TRAIN_00000,./train/TRAIN_00000.png,빨간색,3
1,TRAIN_00001,./train/TRAIN_00001.png,머,1
2,TRAIN_00002,./train/TRAIN_00002.png,차차,2
3,TRAIN_00003,./train/TRAIN_00003.png,써,1
4,TRAIN_00004,./train/TRAIN_00004.png,놓치다,3


In [12]:
import torchmetrics

In [13]:
from torch.nn.utils.rnn import pad_sequence

transform_train = A.Compose(
    [
        # A.RandomResizedCrop(
        #     height=128, 
        #     width=256, 
        #     scale=(0.24, 0.26),
        #     ratio=(0.90, 1.10),
        #     always_apply=True
        #     ),
        A.Resize(128, 256),
        A.VerticalFlip(p=0.5),
        albumentations.OneOf([
                            albumentations.MotionBlur(p=0.3),
                            albumentations.OpticalDistortion(p=0.5),
                            albumentations.GaussNoise(p=0.5)                 
        ], p=1),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        A.pytorch.transforms.ToTensorV2()
        ])

transform_test = A.Compose(
    [
        A.Resize(128, 256),
        A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
        A.pytorch.transforms.ToTensorV2()
        ])

"""
    지금에서야 깨달음
    Collator로 넘어오기 전에 Dataset에서 처리가 다 끝나고 batch_size 만큼
    리스트로해서 Collator에게 넘겨주네 ...
    그러니 DataLoader에서 너가 선택한 batch_size 만큼 수정을 해줘야함.
"""
class TextCollator():
    def __init__(self, is_train = False):
        self.is_train = is_train
    
    def __call__(self, samples):
        if self.is_train:
            return_image = []
            return_label = []
            for i in range(len(samples)):
                image, label = samples[i]
                return_image.append(image.unsqueeze(0))
                return_label.append(torch.LongTensor([char2idx[x] for x in label] + [2]))
            return_image = torch.vstack(return_image)
            return_label = pad_sequence(return_label, batch_first = True)
            return_label = torch.LongTensor(return_label)
            return return_image, return_label
        else:
            return_image = []
            for i in range(len(samples)):
                image = samples[i]
                return_image.append(image.unsqueeze(0))
            return_image = torch.vstack(return_image)
            return return_image

class TextDataset(Dataset):
    def __init__(self, images, labels = None, is_train = False, is_valid = False):
        self.images = images
        self.labels = labels
        self.is_train = is_train
        self.is_valid = is_valid
    def __len__(self):
        return len(self.images)

    """
        is_train : 학습에서는 이미지 변경을 수행해주는 작업이 필요하기 때문에
        학습 중이라는 별도의 표시가 필요함
        is_valid : 학습에서는 이미지 변경이 필요하지만 검증에서는 불필요 하기
        때문에 별도로 빼서 진행
    """
    def __getitem__(self, idx):
        image_path = self.images[idx]
        if self.is_train or self.is_valid:
            label = self.labels[idx]
        temp = Image.open(image_path).convert("RGB")
        image = np.array(temp).copy()
        temp.close()

        if self.is_train:
            # 학습 데이터
            transformed = transform_train(image = image)
            image = transformed['image']
            return (image, label)
        elif self.is_valid:
            # 검증 데이터 
            transformed = transform_test(image = image)
            image = transformed['image']
            return (image, label)
        else:
            # 테스트 데이터
            transformed = transform_test(image = image)
            image = transformed['image']
            return image

In [14]:
config = {
    'n_splits' : 5,
    'random_seed' : 428,
    'batch_size' : 64,
    'input_size' : 1024,
    'hidden_size' : 1024,
    'num_layers' : 1,
    'dropout' : 0.1,
    'model' : 'regnetx_160',
    'vocabulary_len' : len(vocabulary),
    'accumulate_grad_batches' : 1,
    'patience' : 20,
    'max_epochs' : 200
}

In [15]:
# eff = timm.create_model(config['model'], pretrained=True)
# # CNN Feature Extract
# eff = list(eff.children())[:-2]
# feature_extract = nn.Sequential(
#     *eff
# )
# train_dataset = TextDataset(train_df['img_path'].reset_index(drop=True), train_df['label'].reset_index(drop=True), is_train = True)
# images, labels = train_dataset[0]
# feature_extract(images.unsqueeze(0)).shape

In [16]:
# assert False

In [17]:
from pytorch_lightning.accelerators import accelerator
class OCRModel(pl.LightningModule):
    def __init__(self, config):
        # Regnetx_006 -> torch.Size([1, 528, 4, 8]) [:-2]
        # Regnetx_032 -> torch.Size([1, 1008, 4, 8]) [:-2]
        # Effnet -> # torch.Size([2, 512, 4, 8]) [:-4]
        super().__init__()
        self.config = config
        eff =  timm.create_model(config['model'], pretrained=True)
        self.eff_feature_extract = nn.Sequential(*list(eff.children())[:-2]) 
        self.eff_linear = nn.Sequential(
            # nn.BatchNorm1d(8),
            nn.Linear(2048 * 4, 1024, bias = False),
            # nn.BatchNorm1d(8),
        )
        # -> batch_size x 7 x (640 * 2)
        
        self.lstm = nn.LSTM(input_size = config['input_size'], # input_size = 512
                            hidden_size = config['hidden_size'], #  hiddden_size = 512
                            num_layers = config['num_layers'],  # num_laters 
                            dropout = config['dropout'], # dropout
                            bidirectional = True,
                            batch_first = True)
        
        self.lstm_linear = nn.Linear(config['hidden_size']*2, config['vocabulary_len'])

        loss_weight = torch.ones(config['vocabulary_len'])
        loss_weight[0] = 0

        self.crit = nn.CrossEntropyLoss(
            weight = loss_weight
        )
        self.criterion = nn.CTCLoss(blank=0) # idx 0 : -

    def forward(self, x):
        images = x

        representation = self.eff_feature_extract(images)
        # |representation| = (batch_size, 640, 2, 7) = (batch_size, channel, height, width)
        representation = representation.permute(0, 3, 1, 2)
        representation = rearrange(representation, 'b w c h -> b w (c h)')
        # |representation| = (batch_size, 7, 1280) = (batch_size, width, (channel * height))
        representation = self.eff_linear(representation)
        # |representation| = (batch_size, 7, 512)

        context, _ = self.lstm(representation)
        # |conext| = (batch_size, 7, 512 * 2)
        context = self.lstm_linear(context)
        # |context| = (batch_size, 7, vocabulary_len)

        context = rearrange(context, 'b t v -> t b v')

        return context

    # 샘플 별 추론결과를 독립적으로 후처리
    def remove_duplicates(self, text):
        if len(text) > 1:
            letters = [text[0]] + [letter for idx, letter in enumerate(text[1:], start=1) if text[idx] != text[idx-1]]
        elif len(text) == 1:
            letters = [text[0]]
        else:
            return ""
        return "".join(letters)

    def correct_prediction(self, word):
        parts = word.split("-")
        parts = [self.remove_duplicates(part) for part in parts]
        corrected_word = "".join(parts)
        return corrected_word

    def get_acc(self, text_batch_logits, labels):
        # acc구할 차례
        text_batch_logits = text_batch_logits.permute(1, 0, 2)
        text_batch_logits_argmax = text_batch_logits.argmax(dim = -1)
        # |text_batch_logits_argmax| = (batch_size, T)

        text_batch_tokens_new = []
        for text_tokens in text_batch_logits_argmax:
            text = [idx2char[int(idx)] for idx in text_tokens]
            text = "".join(text)
            text_batch_tokens_new.append(text)

        temp = pd.DataFrame(text_batch_tokens_new, columns = ['label'])
        temp['label'] = temp['label'].apply(self.correct_prediction)

        # print(temp['label'][:10], labels[:10])
        acc = accuracy_score(temp['label'].values, labels)
        
        del temp
        gc.collect()

        return acc

    def encode_text_batch(self, text_batch):
        text_batch_targets_lens = [len(text) for text in text_batch]
        text_batch_targets_lens = torch.IntTensor(text_batch_targets_lens)

        text_batch_concat = "".join(text_batch)
        text_batch_targets = [char2idx[c] for c in text_batch_concat]
        text_batch_targets = torch.IntTensor(text_batch_targets)
        
        return text_batch_targets, text_batch_targets_lens

    def compute_loss(self, text_batch, text_batch_logits): # labels, context
        """
        text_batch: list of strings of length equal to batch size
        text_batch_logits: Tensor of size([T, batch_size, num_classes])
        """
        text_batch_logps = F.log_softmax(text_batch_logits, 2) # [T, batch_size, num_classes]  
        text_batch_logps_lens = torch.full(size=(text_batch_logps.size(1),), 
                                        fill_value=text_batch_logps.size(0), 
                                        dtype=torch.int32).to(text_batch_logits.device) # [batch_size] 

        text_batch_targets, text_batch_targets_lens = self.encode_text_batch(text_batch)

        loss = self.criterion(text_batch_logps, text_batch_targets, text_batch_logps_lens, text_batch_targets_lens)

        # acc구할 차례
        acc = self.get_acc(text_batch_logits, text_batch)

        return loss, acc

    def training_step(self, batch, batch_idx):
        # batch = (image, label)
        # |image| = (batch_size, channel, h, w)
        # |label| = ("안녕하세요", "나는") <- tuple 형태로 담겨져 있음
        images, labels = batch

        representation = self.eff_feature_extract(images)
        # |representation| = (batch_size, 512, 4, 8) = (batch_size, channel, height, width)
        representation = representation.permute(0, 3, 1, 2)
        representation = rearrange(representation, 'b w c h -> b w (c h)')
        # |representation| = (batch_size, 8, 512*4) = (batch_size, width, (channel * height))
        representation = self.eff_linear(representation)
        # |representation| = (batch_size, 8, 512)

        context, _ = self.lstm(representation)
        # |conext| = (batch_size, 7, 512 * 2)
        context = self.lstm_linear(context)
        # |context| = (batch_size, 7, vocabulary_len)

        context = rearrange(context, 'b t v -> t b v')
        
        loss, acc = self.compute_loss(labels, context)


        metrics = {'train_loss':loss, 'train_acc':acc}
        self.log_dict(metrics, prog_bar=True)
        return {
            "loss":loss
        }

    def validation_step(self, batch, batch_idx):
        # batch = (image, label)
        # |image| = (batch_size, channel, h, w)
        # |label| = (batch_size, length) 여기서 length의 최대 길이를 7로 설정함 
        images, labels = batch

        representation = self.eff_feature_extract(images)
        # |representation| = (batch_size, 640, 2, 7) = (batch_size, channel, height, width)
        representation = representation.permute(0, 3, 1, 2)
        representation = rearrange(representation, 'b w c h -> b w (c h)')
        # |representation| = (batch_size, 7, 1280) = (batch_size, width, (channel * height))
        representation = self.eff_linear(representation)
        # |representation| = (batch_size, 7, 512)

        context, _ = self.lstm(representation)
        # |conext| = (batch_size, 7, 512 * 2)
        context = self.lstm_linear(context)
        # |context| = (batch_size, 7, vocabulary_len)

        context = rearrange(context, 'b t v -> t b v')
        
        loss, acc = self.compute_loss(labels, context)


        metrics = {'val_loss':loss, 'val_acc': acc}
        self.log_dict(metrics, prog_bar=True)
        return {
            "loss":loss
        }

    def test_step(self, batch, batch_idx):
        pass
    
    def predict_step(self, batch, batch_idx, dataloader_idx=0):
        images = batch

        representation = self.eff_feature_extract(images)
        # |representation| = (batch_size, 640, 2, 7) = (batch_size, channel, height, width)
        representation = representation.permute(0, 3, 1, 2)
        representation = rearrange(representation, 'b w c h -> b w (c h)')
        # |representation| = (batch_size, 7, 1280) = (batch_size, width, (channel * height))
        representation = self.eff_linear(representation)
        # |representation| = (batch_size, 7, 512)

        context, _ = self.lstm(representation)
        # |conext| = (batch_size, 7, 512 * 2)
        context = self.lstm_linear(context)
        # |context| = (batch_size, 7, vocabulary_len)

        context = rearrange(context, 'b t v -> t b v')

        return context

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=1e-4)
        return optimizer

## 학습 데이터에 대해서 예측 성능을 비교 분석 해봄

In [18]:
test_dataset = TextDataset(train_df['img_path'].reset_index(drop=True))
test_dataloader = DataLoader(test_dataset, batch_size = config['batch_size'])

In [19]:
temp = torch.load("/content/drive/MyDrive/Colab Notebooks/DACON/2023_교원그룹_AI_챌린지/fold0/epoch=34-step=33670-train_loss=0.0137-train_acc=1.0-val_loss=0.0741-val_acc=0.974964234620887.ckpt")
m = OCRModel(config)
m.load_state_dict(temp['state_dict'])
trainer = pl.Trainer(accelerator="gpu")
p = trainer.predict(m, test_dataloader)

INFO:pytorch_lightning.utilities.rank_zero:GPU available: True (cuda), used: True
INFO:pytorch_lightning.utilities.rank_zero:TPU available: False, using: 0 TPU cores
INFO:pytorch_lightning.utilities.rank_zero:IPU available: False, using: 0 IPUs
INFO:pytorch_lightning.utilities.rank_zero:HPU available: False, using: 0 HPUs
INFO:pytorch_lightning.accelerators.cuda:LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]


Predicting: 0it [00:00, ?it/s]

In [20]:
p_cat = torch.cat(p, dim = 1)

In [22]:
p_cat.shape

torch.Size([8, 76888, 2350])

In [24]:
torch.save(p_cat,"/content/drive/MyDrive/Colab Notebooks/DACON/2023_교원그룹_AI_챌린지/predict.pt")

In [29]:
train_df.head()

Unnamed: 0,id,img_path,label,label_len,predict
0,TRAIN_00000,./train/TRAIN_00000.png,빨간색,3,빨간색
1,TRAIN_00001,./train/TRAIN_00001.png,머,1,머
2,TRAIN_00002,./train/TRAIN_00002.png,차차,2,차차
3,TRAIN_00003,./train/TRAIN_00003.png,써,1,써
4,TRAIN_00004,./train/TRAIN_00004.png,놓치다,3,놓치다


In [None]:
for a,b in train_df.loc[(train_df['label'] != train_df['predict']), ['label', 'predict']].values:
    print(a, " -- ", b)

In [34]:
print(train_df.loc[(train_df['label'] != train_df['predict']), ['label', 'predict']].values)

[['걱정스럽다' '걱적스럽다']
 ['탁월하다' '단결하다']
 ['참가' '창가']
 ...
 ['갖' '갗']
 ['다듬다' '다묶다']
 ['생각나다' '생각하다']]


In [30]:
accuracy_score(train_df['label'].values, train_df['predict'].values)

0.9931328685880761

In [None]:
assert False

In [None]:
test_dataset = TextDataset(test_df['img_path'].reset_index(drop=True))
test_dataloader = DataLoader(test_dataset, batch_size = config['batch_size'])

In [None]:
temp = torch.load("/content/drive/MyDrive/Colab Notebooks/DACON/2023_교원그룹_AI_챌린지/fold0/epoch=34-step=33670-train_loss=0.0137-train_acc=1.0-val_loss=0.0741-val_acc=0.974964234620887.ckpt")
m = OCRModel(config)
m.load_state_dict(temp['state_dict'])
trainer = pl.Trainer(accelerator="gpu")
p = trainer.predict(m, test_dataloader)

In [None]:
p_cat = torch.cat(p, dim = 1)

In [None]:
p_cat.shape

In [25]:
# 샘플 별 추론결과를 독립적으로 후처리
def remove_duplicates(text):
    if len(text) > 1:
        letters = [text[0]] + [letter for idx, letter in enumerate(text[1:], start=1) if text[idx] != text[idx-1]]
    elif len(text) == 1:
        letters = [text[0]]
    else:
        return ""
    return "".join(letters)

def correct_prediction(word):
    parts = word.split("-")
    parts = [remove_duplicates(part) for part in parts]
    corrected_word = "".join(parts)
    return corrected_word

In [26]:
# acc구할 차례
text_batch_logits = p_cat.permute(1, 0, 2)
text_batch_logits_argmax = text_batch_logits.argmax(dim = -1)

text_batch_tokens_new = []
for text_tokens in text_batch_logits_argmax:
    text = [idx2char[int(idx)] for idx in text_tokens]
    text = "".join(text)
    text_batch_tokens_new.append(text)

In [28]:
train_df['predict'] = text_batch_tokens_new
train_df['predict'] = train_df['predict'].apply(correct_prediction)

In [None]:
submit = pd.read_csv('./sample_submission.csv')
submit['label'] = text_batch_tokens_new

In [None]:
submit['label'] = submit['label'].apply(correct_prediction)

In [None]:
submit['label']

In [None]:
submit.to_csv('/content/drive/MyDrive/Colab Notebooks/DACON/2023_교원그룹_AI_챌린지/submission4.csv', index=False)