## 환경 설정
Transformers (for the TrOCR model) </br>
Datasets & Jiwer (for the evaluation metric)

In [None]:
## Vessl Requirements
##!pip install torch
##!pip install transformers==4.28.0
##!pip install datasets jiwer
##!pip install ipywidgets==7.5.1
##!jupyter nbextension enable --py widgetsnbextension --sys-prefix
## !pip install torchvision

import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

## 데이터 프로세서와 모델 준비

모델에 먹여주기 위한 데이터를 준비하기 위해 TrOCRProcessor를 사용합니다. </br>
TrOCRProcessor는 ViTFeatureExtractor와 RobertaTokenizer를 포함하는 래퍼입니다. </br>
ViTFeatureExtractor: 이미지 Resize & Normalization. </br>
RobertaTokenizer   : 텍스트 encoding & decoding. </br>

In [None]:
from transformers import VisionEncoderDecoderModel, TrOCRProcessor, AutoTokenizer, DeiTImageProcessor

## 이미지 프로세서와 토크나이저 사전 훈련 모델이 사용한 것과 똑같은 것 가져오기.
DEIT = DeiTImageProcessor.from_pretrained("team-lucid/trocr-small-korean")
ROBERTA = AutoTokenizer.from_pretrained("team-lucid/trocr-small-korean")
processor = TrOCRProcessor(image_processor = DEIT, tokenizer = ROBERTA)
## 사전 훈련 모델 가져오기.
model = VisionEncoderDecoderModel.from_pretrained("team-lucid/trocr-small-korean")
model.to(device)

## 데이터 불러오기

테스트용 샘플 데이터를 불러옵니다. (추후에 모델 연결 과정을 위해 수정할 예정)

In [None]:
import pandas as pd
import zipfile

with zipfile.ZipFile('압축 폴더 위치', 'r') as zip_ref:
    zip_ref.extractall('압축 풀고 저장할 경로')

## 이미지 파일 이름과 텍스트 레이블이 저장된 CSV 파일 불러오기
df = pd.read_csv('레이블 CSV 위치', encoding='UTF-8')
## 이미지 파일 불러올 디렉토리 루트로 설정
root_dir='이미지 불러올 디렉토리 위치/'

데이터셋은 두가지를 리턴합니다. </br>
pixel_values: 이미지 피쳐 </br>
labels      : 텍스트 토큰

##  데이터셋 클래스 생성

In [None]:
from torch.utils.data import Dataset
from PIL import Image

class HandWriting(Dataset):
    def __init__(self, root_dir, df, processor, max_target_length=50):
        self.root_dir = root_dir
        self.df = df
        self.processor = processor
        ## 한줄에 많은 수의 단어가 들어오진 않으니
        ## 적당하게 50토큰 정도로 지정해놓자. (대략 30~40 단어)
        self.max_target_length = max_target_length

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        ## 파일 이름과 텍스트 레이블 가져오기.
        file_name = self.df['file_name'][idx]
        text = self.df['text'][idx]
        ## 프로세서를 통해 이미지 인코딩.
        image = Image.open(self.root_dir + file_name).convert("RGB")
        pixel_values = self.processor(image, return_tensors="pt").pixel_values
        ## 토크나이저를 통해 텍스트 레이블을 인코딩.
        labels = self.processor.tokenizer(text, 
                                          padding="max_length", 
                                          max_length=self.max_target_length).input_ids
        ## 패딩 토큰은 비용 함수가 무시하게끔 설정하기.
        labels = [label if label != self.processor.tokenizer.pad_token_id else -100 for label in labels]
        ## 변환한 이미지와 텍스트에 대한 인코딩을 딕셔너리에 저장. 모델 입력값으로 쓰일 예정.
        encoding = {"pixel_values": pixel_values.squeeze(), "labels": torch.tensor(labels)}
        return encoding

## 데이터 스플릿 & 데이터 로더 생성

In [None]:
from sklearn.model_selection import train_test_split
from torch.utils.data import DataLoader

## 데이터 스플릿
train_df, test_df = train_test_split(df, test_size=0.2, random_state = 0)
## 인덱스 초기화
train_df.reset_index(drop=True, inplace=True)
test_df.reset_index(drop=True, inplace=True)
## 데이터셋 클래스에 먹여주기
train_dataset = HandWriting(root_dir=root_dir, df=train_df, processor=processor)
test_dataset = HandWriting(root_dir=root_dir, df=test_df, processor=processor)
## 데이터 로더
train_dataloader = DataLoader(train_dataset, batch_size=10, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=10)

## 중간 점검

데이터셋 스플릿 확인

In [None]:
print("Number of training examples:", len(train_dataset))
print("Number of validation examples:", len(test_dataset))

이미지 확인

In [None]:
Image.open(train_dataset.root_dir + train_df['file_name'][0]).convert("RGB")

레이블 확인

In [None]:
train_df.loc[0][1]

프로세서로 인코딩 제대로 하는지 확인

In [None]:
encoding = train_dataset[0]
for k,v in encoding.items():
  print(k, v.shape)

레이블 디코딩 제대로 되는지 확인

In [None]:
labels = encoding['labels']
labels[labels == -100] = processor.tokenizer.pad_token_id
label_str = processor.decode(labels, skip_special_tokens=True)
print(label_str)

## 모델 설정

In [None]:
# set special tokens used for creating the decoder_input_ids from the labels
model.config.decoder_start_token_id = processor.tokenizer.cls_token_id
model.config.pad_token_id = processor.tokenizer.pad_token_id
# make sure vocab size is set correctly
model.config.vocab_size = model.config.decoder.vocab_size

# set beam search parameters
model.config.eos_token_id = processor.tokenizer.sep_token_id
model.config.max_length = 64
model.config.early_stopping = True
model.config.no_repeat_ngram_size = 3
model.config.length_penalty = 2.0
model.config.num_beams = 4

## 평가 지표 설정

In [None]:
from datasets import load_metric

cer_metric = load_metric("cer")

def compute_cer(pred_ids, label_ids):
    pred_str = processor.batch_decode(pred_ids, skip_special_tokens=True)
    label_ids[label_ids == -100] = processor.tokenizer.pad_token_id
    label_str = processor.batch_decode(label_ids, skip_special_tokens=True)

    cer = cer_metric.compute(predictions=pred_str, references=label_str)

    return cer

## 모델 훈련

In [None]:
from transformers import AdamW
from tqdm.notebook import tqdm
import torchvision.models as models

optimizer = AdamW(model.parameters(), lr=5e-5)

for epoch in range(1):  # loop over the dataset multiple times
   # train
   model.train()
   train_loss = 0.0
   for batch in tqdm(train_dataloader):
      # get the inputs
      for k,v in batch.items():
        batch[k] = v.to(device)

      # forward + backward + optimize
      outputs = model(**batch)
      loss = outputs.loss
      loss.backward()
      optimizer.step()
      optimizer.zero_grad()

      train_loss += loss.item()

   print(f"Loss after epoch {epoch}:", train_loss/len(train_dataloader))
    
   # evaluate
   model.eval()
   valid_cer = 0.0
   with torch.no_grad():
     for batch in tqdm(test_dataloader):
       # run batch generation
       outputs = model.generate(batch["pixel_values"].to(device))
       # compute metrics
       cer = compute_cer(pred_ids=outputs, label_ids=batch["labels"])
       valid_cer += cer 

   print("Validation CER:", valid_cer / len(test_dataloader))

torch.save(model, '저장할 위치')

## 모델 불러오기

In [None]:
model_path = 'Model/model.pth'
model = torch.load(model_path)
model = model.to('cuda')

## 인퍼런스

In [None]:
import os
root_dir='인퍼런스 진행할 이미지 들어있는 폴더 위치/'
lines = os.listdir(root_dir)
for line in lines:
	image = Image.open(root_dir + line).convert("RGB")
	pixel_values = processor(image, return_tensors="pt").pixel_values
	pixel_values = pixel_values.to('cuda')
	generated_ids = model.generate(pixel_values)
	generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
	print(generated_text)