In [None]:
import os
import numpy
import torch
import zipfile
import evaluate
import pandas as pd

#import torchvision.transforms as T

from PIL import Image
from tqdm import tqdm
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
from sklearn.model_selection import train_test_split

from transformers import TrOCRProcessor
from transformers import default_data_collator
from transformers import VisionEncoderDecoderModel
from transformers import Seq2SeqTrainer, Seq2SeqTrainingArguments

print("Succesfully imported all required packages.")

In [None]:
IMG_DATA_TXT_FILE = "Data/augmented_data.txt"
IMAGE_ZIP_FILE = "Data/augmented_data.zip"
OUTPUT_DIR = "Output2"
IMG_DIR = "Data/img"

if not os.path.exists(OUTPUT_DIR):
    os.mkdir(OUTPUT_DIR)

In [None]:
'''
try:
    new = ""
    with open(IMG_DATA_TXT_FILE) as f:
        lines = f.readlines()
        for i, line in enumerate(lines):
            new += line[:-1]
            if i % 3 == 2:
                new += "\n"
            else:
                new += "\t"

    with open(IMG_DATA_TXT_FILE, 'w+') as f:
        f.write(new)

    print("Succesfully rewritten test file structure.")
except:
    print("File does not exists:", IMG_DATA_TXT_FILE)
    '''

In [None]:
if not os.path.exists(IMG_DIR):
    try:
        with zipfile.ZipFile(IMAGE_ZIP_FILE, 'r') as zip_ref:
            zip_ref.extractall("Data")
        print("Succesfully extracted the images.")
    except:
        print("Unable to extract:", IMAGE_ZIP_FILE)
else:
    print("Data/img folder already contains", len(os.listdir("Data/img")), "items.")

In [None]:
# Load the data

print("loading data")

_df = pd.read_table(IMG_DATA_TXT_FILE, header=None)
_df.rename(columns={0: "file_name", 1: "text"}, inplace=True)

print("data loaded")

In [None]:
class IAMDataset(Dataset):
    def __init__(self, root_dir, df, processor, max_target_length=128):
        self.root_dir = root_dir
        self.df = df
        self.processor = processor
        self.max_target_length = max_target_length

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):

        # get file name + text 
        file_name = self.df['file_name'][idx]
        text = self.df['text'][idx]

        # some file names end with jp instead of jpg, the two lines below fix this
        if file_name.endswith('jp'):
            file_name = file_name + 'g'

        if file_name.endswith('pn'):
            file_name = file_name + 'g'
            
        if file_name.endswith('augm'):
            file_name = file_name + 'ented.png'

        # prepare image (i.e. resize + normalize)
        image = Image.open(self.root_dir + file_name).convert("RGB")
        pixel_values = self.processor(image, return_tensors="pt").pixel_values
        
        
        if (pixel_values.size() != torch.Size([1, 3, 384, 384])):
            print(file_name)
            print(pixel_values.dim())
            print(pixel_values.size())
            

        # add labels (input_ids) by encoding the text
        labels = self.processor.tokenizer(text, 
                                          padding="max_length", 
                                          max_length=self.max_target_length).input_ids
        
        # important: make sure that PAD tokens are ignored by the loss function
        labels = [label if label != self.processor.tokenizer.pad_token_id else -100 for label in labels]

        encoding = {"pixel_values": pixel_values.squeeze(), "labels": torch.tensor(labels)}
        
        if encoding["labels"].size() != torch.Size([128]):
            print(encoding["labels"].size())
            print(file_name)
        
        return encoding
    
print("IAMDataset Class definition created.")

# Split dataset for train/val

In [None]:
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)
# we reset the indices to start from zero
train_df.reset_index(drop=True, inplace=True)
test_df.reset_index(drop=True, inplace=True)

In [None]:
processor = TrOCRProcessor.from_pretrained("microsoft/trocr-base-handwritten")

In [None]:
# Create train and eval data
train_dataset = IAMDataset(root_dir='Data/img/',
                          df=train_df,
                          processor=processor)
eval_dataset = IAMDataset(root_dir='Data/img/',
                          df=test_df,
                          processor=processor)

print("Number of training examples:", len(train_dataset))
print("Number of validation examples:", len(eval_dataset))

In [None]:
# Check Validity of data.

try:
    for item in tqdm(eval_dataset):
        labels = item['labels']
        labels[labels == -100] = processor.tokenizer.pad_token_id
        label_str = processor.decode(labels, skip_special_tokens=True)
except:
    print("Cannot find item.")
    

try:
    for item in tqdm(train_dataset):
        labels = item['labels']
        labels[labels == -100] = processor.tokenizer.pad_token_id
        label_str = processor.decode(labels, skip_special_tokens=True)
except:
    print("Cannot find item.")

In [None]:
encoding = train_dataset[0]

labels = encoding['labels']
labels[labels == -100] = processor.tokenizer.pad_token_id
label_str = processor.decode(labels, skip_special_tokens=True)
label_str

In [None]:
# Load a pretrained base model.
model = VisionEncoderDecoderModel.from_pretrained("microsoft/trocr-base-stage1", )

In [None]:
processor.tokenizer.all_special_ids
len(processor.tokenizer.get_vocab())
vocab = processor.tokenizer.get_vocab()
SPECIALS = set(processor.tokenizer.all_special_ids)
UNK_TOK = processor.tokenizer.unk_token_id

In [None]:
cer_metric = evaluate.load("cer")

def compute_metrics(pred):
    labels_ids = pred.label_ids
    pred_ids = pred.predictions
    
    pred_ids[pred_ids == -100] = processor.tokenizer.pad_token_id
    
    try:                  
        pred_str = processor.batch_decode(pred_ids, skip_special_tokens=True)
    except Exception as e:
        raise Exception("BREAK", labels_ids, pred_ids)
    
    labels_ids[labels_ids == -100] = processor.tokenizer.pad_token_id
    
    label_str = processor.batch_decode(labels_ids, skip_special_tokens=True)

    cer = cer_metric.compute(predictions=pred_str, references=label_str)
    
    return {"cer": cer}

print("compute_metrics definition created.")

In [None]:
# set special tokens used for creating the decoder_input_ids from the labels
model.config.decoder_start_token_id = processor.tokenizer.cls_token_id
model.config.pad_token_id = processor.tokenizer.pad_token_id
# make sure vocab size is set correctly
model.config.vocab_size = model.config.decoder.vocab_size

# set beam search parameters
model.config.eos_token_id = processor.tokenizer.sep_token_id
#model.config.max_length = 64
model.config.max_new_tokens = 64
model.config.early_stopping = True
model.config.no_repeat_ngram_size = 3
model.config.length_penalty = 2.0
model.config.num_beams = 4

In [None]:
training_args = Seq2SeqTrainingArguments(
    predict_with_generate=True,
    evaluation_strategy="steps",
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    fp16=True, 
    output_dir=OUTPUT_DIR,
    logging_steps=2,
    save_steps=1000,
    eval_steps=2000,
)

In [None]:
# instantiate trainer
trainer = Seq2SeqTrainer(
    model=model,
    tokenizer=processor.image_processor,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    data_collator=default_data_collator,
)


print("Initialized all training params.")

In [None]:
trainer.train()

In [None]:
trainer.save_model(training_args.output_dir)