## Setup

In [1]:
!pip install -q transformers
!pip install -q sentencepiece
!pip install -q jiwer
!pip install -q datasets
!pip install -q evaluate
!pip install -q -U accelerate

!pip install -q matplotlib
!pip install -q protobuf==3.20.1
!pip install -q tensorboard

In [1]:
import os
import torch
import evaluate
import numpy as np
import pandas as pd
import glob as glob
import torch.optim as optim
import matplotlib.pyplot as plt
import torchvision.transforms as transforms

from PIL import Image
from tqdm.notebook import tqdm
from dataclasses import dataclass
from torch.utils.data import Dataset
from transformers import (
    VisionEncoderDecoderModel, 
    TrOCRProcessor,
    Seq2SeqTrainer,
    Seq2SeqTrainingArguments,
    default_data_collator
)

block_plot = False
plt.rcParams['figure.figsize'] = (12, 9)

bold = f"\033[1m"
reset = f"\033[0m"

if torch.cuda.is_available():
    device = torch.device("cuda")
    print(f"Using GPU: {torch.cuda.get_device_name(0)}")
else:
    device = torch.device("cpu")
    print("Using CPU")

Using GPU: NVIDIA GeForce RTX 4060


In [None]:
def seed_everything(seed_value):
    np.random.seed(seed_value)
    torch.manual_seed(seed_value)
    torch.cuda.manual_seed_all(seed_value)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# controlling randomness both on CPU and GPU
seed_everything(42)

@dataclass(frozen=True)
class TrainingConfig:
    BATCH_SIZE:    int = 16
    EPOCHS:        int = 50
    LEARNING_RATE: float = 0.00005

@dataclass(frozen=True)
class DatasetConfig:
    # pls make sure this path is correct
    DATA_ROOT:     str = './Datasets/khmer Printed Dataset 10 fonts/'

@dataclass(frozen=True)
class ModelConfig:
    MODEL_NAME: str = 'microsoft/trocr-small-printed'

def load_dataset(file_path):
    # Initialize lists to store data
    file_names = []
    texts = []
    
    # Read the file line by line
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            # Strip the line of whitespace and split only at the first space
            parts = line.strip().split(' ', 1)
            if len(parts) == 2:  # Ensure we have both file name and text
                file_names.append(parts[0])  # File name is the first part
                texts.append(parts[1])        # Text is everything after the first space
                
    # Create a DataFrame from the lists
    df = pd.DataFrame({'file_name': file_names, 'text': texts})
    
    return df

# Example usage for training and testing datasets
train_file = os.path.join(DatasetConfig.DATA_ROOT, 'kh_train.txt')
valid_file = os.path.join(DatasetConfig.DATA_ROOT, 'kh_val.txt')
test_file = os.path.join(DatasetConfig.DATA_ROOT, 'kh_test.txt')

train_df = load_dataset(train_file)
valid_df = load_dataset(valid_file)
test_df = load_dataset(test_file)

# Augmentations.
train_transforms = transforms.Compose([
    transforms.ColorJitter(brightness=.5, hue=.3),
    transforms.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5)),
])

In [4]:
print(train_df.head())

       file_name text
0  content_1.jpg    ក
1  content_2.jpg    ក
2  content_3.jpg    ក
3  content_4.jpg   កក
4  content_5.jpg   កក


In [None]:
class CustomOCRDataset(Dataset):
    def __init__(self, root_dir, df, processor, max_target_length=50):
        self.root_dir = root_dir
        self.df = df
        self.processor = processor
        self.max_target_length = max_target_length
        self.dictionary = self.load_dictionary()  # Load the dictionary
        self.word_to_id = {word: idx for idx, word in enumerate(self.dictionary)}  # Map words to IDs

        # Add special tokens to the dictionary
        self.word_to_id["<START>"] = 0  # Set your start token ID
        self.word_to_id["<END>"] = 1  # Set your end token ID
        self.word_to_id["<UNK>"] = 2  # Placeholder for unknown tokens
        self.word_to_id["<PAD>"] = 3  # Placeholder for padding

    def __len__(self):
        return len(self.df)

    def load_dictionary(self):
        """
        Load a custom dictionary for tokenization.
        Ensure this list includes all necessary words and special tokens.
        """
        def load_tokenized_words(tokenized_file):
            # Function to load tokenized words from the specified file
            with open(tokenized_file, 'r', encoding='utf-8') as file:
                words = [line.strip() for line in file.readlines()]
            return words

        # Load tokenized words from the file
        loaded_words = load_tokenized_words("./Dicts/unique_characters.txt")
        
        # Special tokens to include in the dictionary
        special_tokens = ["<START>", "<END>", "<UNK>", "<PAD>", " "]

        # Combine special tokens with the loaded words to form the dictionary
        full_dictionary = special_tokens + loaded_words
        
        return full_dictionary  # Ensure this list includes all necessary words

    def __getitem__(self, idx):
        # The image file name.
        file_name = self.df['file_name'][idx]
        
        # The text (label).
        text = self.df['text'][idx]
        
        # Read the image, apply augmentations, and get the transformed pixels.
        image = Image.open(self.root_dir + str(file_name)).convert('RGB')
        image = train_transforms(image)
        pixel_values = self.processor(image, return_tensors='pt').pixel_values
        
        # Encode the text to obtain the labels using the dictionary
        labels = self.encode_text(text)

        encoding = {
            "pixel_values": pixel_values.squeeze(),
            "labels": torch.tensor(labels)
        }
        return encoding

    def encode_text(self, text):
        """
        Tokenize and encode the given text character by character using the custom dictionary.
        This function returns the token IDs, padding them as needed.
        """
        # Start token
        labels = [self.word_to_id["<START>"]]
        # Encode each character in the text
        for char in text:
            if char in self.word_to_id:
                labels.append(self.word_to_id[char])
            else:
                # Append unknown token ID
                labels.append(self.word_to_id["<UNK>"])

        # End token
        labels.append(self.word_to_id["<END>"])

        # Pad or truncate labels to max_target_length
        if len(labels) < self.max_target_length:
            labels += [self.word_to_id["<PAD>"]] * (self.max_target_length - len(labels))  # Pad with <PAD>
        else:
            labels = labels[:self.max_target_length]  # Truncate to max length

        return labels

    def decode_labels(self, labels):
        """
        Decode the labels back to a string based on the custom dictionary.
        """
        label_str = []
        for label in labels:
            if label == self.word_to_id["<PAD>"]:
                continue  # Skip padding
            elif label == self.word_to_id["<UNK>"]:
                label_str.append("<UNK>")  # Placeholder for unknown token
            elif label == self.word_to_id["<START>"]:
                continue  # Skip start token
            elif label == self.word_to_id["<END>"]:
                continue  # Skip end token
            else:
                label_str.append(self.dictionary[label])  # Map back to character
        
        return "".join(label_str)  # Join characters without spaces
    
processor = TrOCRProcessor.from_pretrained(ModelConfig.MODEL_NAME)
train_dataset = CustomOCRDataset(
    root_dir=os.path.join(DatasetConfig.DATA_ROOT, 'kh_train/'),
    df=train_df,
    processor=processor
)
valid_dataset = CustomOCRDataset(
    root_dir=os.path.join(DatasetConfig.DATA_ROOT, 'kh_val/'),
    df=valid_df,
    processor=processor
)

test_dataset = CustomOCRDataset(
    root_dir=os.path.join(DatasetConfig.DATA_ROOT, 'kh_test/'),
    df=test_df,
    processor=processor
)




In [13]:
example = train_dataset[20000]
labels = example['labels']
print("Encoded labels:", labels)

# Decode the labels to string
label_str = train_dataset.decode_labels(labels)
print("Decoded label string:", label_str)

Encoded labels: tensor([  0, 148, 140, 197, 148, 130, 126, 202, 152, 155, 174, 166, 147, 202,
        143, 187, 150, 185, 129, 146, 202, 152, 135, 174, 146, 195,   1,   3,
          3,   3,   3,   3,   3,   3,   3,   3,   3,   3,   3,   3,   3,   3,
          3,   3,   3,   3,   3,   3,   3,   3])
Decoded label string: ពណ៍ពងក្រសាឬផ្ទៃមេឃប្រញាប់


## Initialize the Model

In [14]:
model = VisionEncoderDecoderModel.from_pretrained(ModelConfig.MODEL_NAME).to(device)
# print(model)
# Total parameters and trainable parameters.
total_params = sum(p.numel() for p in model.parameters())
print(f"{total_params:,} total parameters.")
total_trainable_params = sum(
    p.numel() for p in model.parameters() if p.requires_grad)
print(f"{total_trainable_params:,} training parameters.")

Some weights of VisionEncoderDecoderModel were not initialized from the model checkpoint at microsoft/trocr-small-printed and are newly initialized: ['encoder.pooler.dense.bias', 'encoder.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


61,596,672 total parameters.
61,596,672 training parameters.


## Model Configurations

In [15]:
# Set special tokens used for creating the decoder_input_ids from the labels.

model.config.decoder_start_token_id = train_dataset.word_to_id["<START>"]  # Use start token ID
model.config.pad_token_id = train_dataset.word_to_id["<PAD>"]  # Ensure padding is set to -100
model.config.vocab_size = len(train_dataset.word_to_id)  # Set the vocabulary size based on your custom dictionary
model.config.eos_token_id = train_dataset.word_to_id["<END>"]  # Use end token ID

model.config.max_length = 50
model.config.early_stopping = True
model.config.no_repeat_ngram_size = 3
model.config.length_penalty = 2.0
model.config.num_beams = 4

We use the AdamW optimizer here with a weight decay of 0.0005.

In [16]:
optimizer = optim.AdamW(
    model.parameters(), lr=TrainingConfig.LEARNING_RATE, weight_decay=0.0005
)

## Evaluation Metric

In [17]:
cer_metric = evaluate.load('cer')

In [None]:
def compute_cer(pred, dataset):
    labels_ids = pred.label_ids
    pred_ids = pred.predictions
    # Decode predictions using the processor
    pred_str = processor.batch_decode(pred_ids, skip_special_tokens=True)
    # Decode labels using the custom dataset's decode_labels method
    label_str = [dataset.decode_labels(torch.tensor(label_ids)) for label_ids in labels_ids]
    # Calculate CER
    cer = cer_metric.compute(predictions=pred_str, references=label_str)
    return {"cer": cer}


### Training and Validation Loops

In [None]:
training_args = Seq2SeqTrainingArguments(
    predict_with_generate=True,
    evaluation_strategy='epoch',
    per_device_train_batch_size=TrainingConfig.BATCH_SIZE,
    per_device_eval_batch_size=TrainingConfig.BATCH_SIZE,
    fp16=True,
    output_dir='./Model/experiments_02',
    logging_strategy='epoch',
    save_strategy='epoch',
    save_total_limit=10,
    report_to='tensorboard',
    num_train_epochs=TrainingConfig.EPOCHS
)



In [20]:
trainer = Seq2SeqTrainer(
    model=model,
    tokenizer=processor.feature_extractor,
    args=training_args,
    compute_metrics=lambda pred: compute_cer(pred, train_dataset),  # Use a lambda to pass train_dataset
    train_dataset=train_dataset,
    eval_dataset=valid_dataset,
    data_collator=default_data_collator
)



In [21]:
def check_device(model, sample_data):
    # Check if the model is on GPU
    model_on_cuda = next(model.parameters()).is_cuda
    
    # Check if sample data is on GPU
    sample_data_on_cuda = all(tensor.is_cuda for tensor in sample_data.values())
    
    print(f"Model is on GPU: {model_on_cuda}")
    print(f"Sample data is on GPU: {sample_data_on_cuda}")

# Create a sample data point to check (you can adjust this as needed)
sample_index = 0
sample_data = train_dataset[sample_index]
sample_data['pixel_values'] = sample_data['pixel_values'].to(device)  # Move to GPU if available
sample_data['labels'] = sample_data['labels'].to(device)  # Move to GPU if available

# Check if the model and sample data are on the correct device
check_device(model, sample_data)

Model is on GPU: True
Sample data is on GPU: True


In [22]:
def print_gpu_name():
    if torch.cuda.is_available():
        gpu_name = torch.cuda.get_device_name(0)  # Get the name of the first GPU
        print(f"Using GPU: {gpu_name}")
    else:
        print("CUDA is not available. Using CPU.")

print_gpu_name()

Using GPU: NVIDIA GeForce RTX 4060


In [None]:
res = trainer.train()

## Inference
- don't forget change to correct checkpoints file

In [None]:
processor = TrOCRProcessor.from_pretrained(ModelConfig.MODEL_NAME)
trained_model = VisionEncoderDecoderModel.from_pretrained('D:/Github/Khmer-OCR/Model/experiments_02/checkpoint-141150').to(device)



In [None]:
def read_and_show(image_path):
    image = Image.open(image_path).convert('RGB')
    return image

In [None]:
def ocr(image, processor, model, train_dataset):
    pixel_values = processor(image, return_tensors='pt').pixel_values.to(device)
    generated_ids = model.generate(pixel_values)
    print(generated_ids)
    generated_text = train_dataset.decode_labels(generated_ids[0])
    print(generated_text)
    return generated_text


In [None]:
def eval_new_data(
    data_path=os.path.join(DatasetConfig.DATA_ROOT, 'kh_test', '*'),
    num_samples=50
):
    image_paths = glob.glob(data_path)
    for i, image_path in tqdm(enumerate(image_paths), total=len(image_paths)):
        if i == num_samples:
            break
        image = read_and_show(image_path)
        text = ocr(image, processor, trained_model, train_dataset)
        plt.figure(figsize=(7, 4))
        plt.imshow(image)
        plt.title(text)
        plt.axis('off')
        plt.show()

In [None]:
eval_new_data(
    data_path=os.path.join(DatasetConfig.DATA_ROOT, 'kh_test', '*'),
    num_samples=20
)