Import OS, setting OS environment here.

In [None]:
# ALert! This cuda_launch_blocking is only for debugging purposes. It is not recommended to use it in production.
# This will SLOW DOWN the training process.

# import os

# os.environ['CUDA_LAUNCH_BLOCKING'] = "1"

Import the libraries used for training.

In [None]:
import pandas as pd
import numpy as np
import torch
import matplotlib.pyplot as plt
from torch import nn, Tensor
from torch.nn import CrossEntropyLoss
from collections import defaultdict
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import torchvision.models as models
from torchvision.models import resnet18, ResNet18_Weights
from transformers import PreTrainedTokenizerFast
import time
from PIL import Image
import os
import copy
from tqdm.notebook import tqdm
import wandb

Initialize the Wandb to record the experiment process

In [None]:
os.environ["WANDB_NOTEBOOK_NAME"] = "OCR_extract.ipynb"

wandb.init(project="OCR_Recognition", name="ResNet18_LSTM_1")

wandb.config.update({"starting_learning_rate": 0.001, "epochs": 200, "batch_size": 32})
wandb.config.update({"cnn_backend": "ResNet50", "dataset": "OCR", "optimizer": "AdamW", "scheduler": "ReduceLROnPlateau"})
wandb.config.update({"loss_function": "CTCLoss", "pretrained": True, "pretrained_weights": "IMAGENET1K_V2"})
wandb.config.update({"lr_scheduler": "ReduceLROnPlateau", "lr_patience": 5, "lr_factor": 0.1, "lr_min": 1e-6})

Build the configuration class

In [None]:
class CFG:
    anonymous = 'allow'
    backend_model = 'resnet18'
    notes = "This is a self-developed model with pretrained resnet18 as backbone and LSTM as decoder."
    save_code = True # save code in wandb
    image_size = [500, 1200]
    group = None
    force = True # The user msu have logged into the wandb account
    train_batch_size = 32
    test_batch_size = 32
    epochs = 200
    optimizer = 'AdamW'
    scheduler = 'CosineAnnealingwithWarmRestarts'
    T_0 = 20
    T_mult = 1
    eta_min = 0.001
    last_epoch = -1
    verbose = False
    loss_function = 'CrossEntropyLoss' # 'CTCLoss' would be tested after the first 200 epochs
    pretrained = True
    pretrained_resnet18_weights = 'IMAGENET1K_V1'
    pretrained_resnet50_weights = 'IMAGENET1K_V2'

Define the Custom Tokenizer used for OCR Detection Task

In [None]:
# Custom Tokenizer for OCR Detection task
tokenizer = PreTrainedTokenizerFast(tokenizer_file = 'C:/Users/ra78lof/occinference/byte-level-BPE.tokenizer.json')
#feature_extractor = AutoFeatureExtractor.from_pretrained('microsoft/swin-base-batch4-window7-224-in22k')

# Add PAD token to the vocabulary, otherwise it will throw an error
tokenizer.add_special_tokens({'pad_token': "pad_token"})

# Debug test for blank token, this token is required for CTC loss
# tokenizer.decode(62)

# Debug test for pad token, this token is required for padding sequences
# print(tokenizer.pad_token_id)

# Debug test for token length, this is required for the model building
# print(len(tokenizer))

Define the CustomDataset Class

In [None]:
class CustomDataset(Dataset):
    def __init__(self, excel_file, img_dir, tokenizer = tokenizer, feature_extractor = None, transform=None, max_target_length = 45):
        self.data = pd.read_excel(excel_file)
        self.img_dir = img_dir
        self.tokenizer = tokenizer
        self.feature_extractor = feature_extractor
        self.transform = transform
        self.max_target_length = max_target_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_name = self.data['ImageName'][idx]
        text = self.data['Labels'][idx]
        image = Image.open(img_name).convert('L')

        if self.transform:
            image = self.transform(image)

        # The labels MUST be tokenized and padded
        labels = self.tokenizer(text, padding = 'max_length', max_length = self.max_target_length).input_ids
        

        return image, torch.as_tensor(labels)

Define our model architecture, this CNN-LSTM architecture includes the following part: A modified resnet18 with pretrained weights, some medium CNN layers and LSTM layers.

In [None]:
class ModifiedResNet(nn.Module):
    """
    A modified ResNet architecture for Optical Character Recognition (OCR).

    Attributes:
        features (nn.Sequential): A sequential container of the original ResNet layers excluding avgpool and fc layers.
        conv1 (nn.Conv2d): Convolution layer to adjust input channels to 1 (grayscale images).
        post_resnet1 (nn.Conv2d): Convolution layer following the features layer.
        bn1 (nn.BatchNorm2d): Batch normalization layer following post_resnet1.
        relu1 (nn.ReLU): ReLU activation layer following bn1.
        post_resnet2 (nn.Conv2d): Another convolution layer following relu1.
        bn2 (nn.BatchNorm2d): Batch normalization layer following post_resnet2.
        relu2 (nn.ReLU): ReLU activation layer following bn2.
        post_resnet3 (nn.Conv2d): Another convolution layer following relu2.
        bn3 (nn.BatchNorm2d): Batch normalization layer following post_resnet3.
        relu3 (nn.ReLU): ReLU activation layer following bn3.
        dwv (nn.Conv2d): Depthwise convolution layer for channel reduction following relu3.
        lstm1 (nn.LSTM): LSTM layer following the depthwise convolution.
        linear1 (nn.Linear): Fully connected layer to project LSTM output to class scores.
    """

    def __init__(self, original_resnet: nn.Module):
        """
        Initializes the ModifiedResNet with an original_resnet model.

        Args:
            original_resnet (nn.Module): The original ResNet model.
        """
        super(ModifiedResNet, self).__init__()
        self.features = nn.Sequential(*list(original_resnet.children())[:-2]) # Remove avgpool and fc layers in the original resnet
        self.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False) # Adjust input channels to 1, since the input images are grayscale
        
        self.post_resnet1 = nn.Conv2d(512, 512, kernel_size=(2, 3), stride=(2, 2), padding=(1, 1), bias=False)
        self.bn1 = nn.BatchNorm2d(512) # Batch normalization after post_resnet1, important for training
        self.relu1 = nn.ReLU(inplace=True) # ReLU activation after post_resnet1, import for training
        
        self.post_resnet2 = nn.Conv2d(512, 512, kernel_size=(3, 4), stride=(1, 1), padding=(1, 1), bias=False)
        self.bn2 = nn.BatchNorm2d(512) # Batch normalization after post_resnet2, important for training
        self.relu2 = nn.ReLU(inplace=True) # ReLU activation after post_resnet2, import for training
        
        self.post_resnet3 = nn.Conv2d(512, 512, kernel_size=(2, 3), stride=(2, 2), padding=(1, 1), bias=False)
        self.bn3 = nn.BatchNorm2d(512) # Batch normalization after post_resnet3, important for training
        self.relu3 = nn.ReLU(inplace=True) # ReLU activation after post_resnet3, import for training
        
        self.dwv = nn.Conv2d(512, 128, kernel_size=(1, 1), stride=(1, 1), padding=(0, 0), bias=False) # Depthwise Convolution for channel reduction
        self.lstm1 = nn.LSTM(bidirectional=True, num_layers=2, input_size=128, hidden_size=128, dropout=0)
        self.linear1 = nn.Linear(256, 82) # Project first dimension of LSTM output to 82 (number of classes including the PAD token)

    def forward(self, x: Tensor) -> Tensor:
        """
        Defines the forward pass of the ModifiedResNet.

        Args:
            x (Tensor): The input tensor.

        Returns:
            Tensor: The output tensor.
        """
        x = self.features(x)
        x = self.post_resnet1(x)
        x = self.bn1(x)
        x = self.relu1(x)
        
        x = self.post_resnet2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        
        x = self.post_resnet3(x)
        x = self.bn3(x)
        x = self.relu3(x)
        
        x = self.dwv(x)
        
        batch_size, channels, height, width = x.size()
        x = x.permute(0, 2, 3, 1).contiguous() # Change the order of the dimensions, this is required for the LSTM layer
        x = x.view(batch_size, height * width, channels) # Reshape to (batch_size, sequence_length, input_dim)
        x, _ = self.lstm1(x)
        x = self.linear1(x)
        return x


A simple debug test with a dummy input

In [None]:
# Debug test for the ModifiedResNet


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

dummy_input = torch.randn(32, 1, 500, 1200).to(device)

original_resnet = models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)
original_resnet.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)

model= ModifiedResNet(original_resnet).to(device)

with torch.no_grad():
    output = model(dummy_input)
    print(output.shape)

Preparation for Train, valid and Test Dataset

In [None]:
# Define the transforms for the dataset

transform = transforms.Compose([
    transforms.Resize((500, 1200)),  # Resize images to the required dimensions
    transforms.ToTensor(),  # Convert PIL image to PyTorch tensor
])

In [None]:
# Load Training, Validation and Test data

train_dataset = CustomDataset(excel_file='C:/Users/ra78lof/occinference/Train_data.xlsx',
                             img_dir='C:/Users/ra78lof/occinference/Train_data/', tokenizer = tokenizer, transform=transform)

valid_dataset = CustomDataset(excel_file='C:/Users/LMMISTA-WAP265/OcciGen/data/dom_project/Val_data.xlsx',
                              img_dir='C:/Users/LMMISTA-WAP265/OcciGen/data/dom_project/Val_data/', tokenizer = tokenizer, transform=transform)

test_dataset = CustomDataset(excel_file='C:/Users/ra78lof/occinference/Test_data.xlsx',
                             img_dir='C:/Users/ra78lof/occinference/Test_data/', tokenizer = tokenizer, transform=transform)

In [None]:
# Define the DataLoaders

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(train_dataset, batch_size=32, shuffle=False)

Define the model, optimizer, scheduler and loss function

In [None]:
# Define the model, optimizer, scheduler and loss function

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

original_resnet = models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)
original_resnet.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
model= ModifiedResNet(original_resnet).to(device)

optimizer = optim.AdamW(model.parameters(), lr=0.001, weight_decay=0.01)
scheduler = optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=20, T_mult=1, eta_min=0.0001, last_epoch=-1, verbose=False)
iters = len(train_loader)
# The following scheduler can be used during validation
# scheduler = lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=4, threshold=0.0001, min_lr=0.00001)
# ...valid_loss += loss.item()
#    scheduler.step(valid_loss)...

# The CTC loss function is returning blank predictions for some reason
# criterion = CTCLoss(blank=0, reduction='mean', zero_infinity=True)
criterion = CrossEntropyLoss()

Define the train process

In [None]:
# Use Wandb to track the Epochs
# wandb.watch(model, log="all")
epoch = 200

def training(model: torch.nn.Module, 
                     train_loader: DataLoader, 
                     optimizer: optim.Optimizer,
                     scheduler: optim.lr_scheduler, 
                     criterion: nn.Module,
                     device: torch.device):
    """
    Train the model for one epoch.

    Parameters:
    - model: The model to train.
    - optimizer: The optimizer to use.
    - scheduler: The learning rate scheduler.
    - criterion: The loss function.
    - train_loader: The data loader for training data.
    - device: The device to train on (In our case would be cuda).

    Returns:
    - The average training loss for the epoch.
    """
     
    model.train()
    train_loss = 0.0
    
    pbar = tqdm(enumerate(train_loader), total=len(train_loader), desc='Train')
    for batch_idx, (data, target) in pbar:
        data = data.to(device)
        target = target.to(device) 

        optimizer.zero_grad()
        output = model(data) 

        loss = criterion(output.view(-1, output.size(2)), target.view(-1))  

        loss.backward()
        optimizer.step()
        if scheduler is not None:
            scheduler.step(epoch + batch_idx / iters)

        train_loss += loss.item()
        epoch_train_loss = train_loss / len(train_loader.dataset)

        current_lr = optimizer.param_groups[0]['lr']   
        pbar.set_postfix(train_loss = f'{epoch_train_loss:.4f}',
                        lr = f'{current_lr:.6f})')
                
    # If the Scheduler like CosineAnnealing is used, then the scheduler.step() should be used after each epoch            
    # scheduler.step()       

    # Do not use print if the tqdm is used
    # print("Time taken for epoch: ", end_time - start_time)
    return epoch_train_loss

Define the validation process

In [None]:
def validating(model: torch.nn.Module, 
               valid_loader: DataLoader, 
               criterion: nn.Module, 
               device: torch.device):
    """
    Train the model for one epoch.

    Parameters:
    - model: The model to test.
    - valid_loader: The data loader for validating data.
    - criterion: The loss function.
    - device: The device to validate on (e.g., 'cuda').

    Returns:
    - The average validating loss for the epoch.
    """
    model.eval()
    valid_loss = 0

    pbar = tqdm(enumerate(valid_loader), total=len(valid_loader), desc='Valid')
    with torch.no_grad():
        for _, (data, target) in pbar:
            data = data.to(device)
            target = target.to(device)
            output = model(data)
           
            loss = criterion(output.view(-1, output.size(2)), target.view(-1))
            
            valid_loss += loss.item()
            epoch_valid_loss = valid_loss / len(valid_loader.dataset)
            # If the Scheduler like ReduceLROnPlateau is used, then the scheduler.step() should be used after each epoch
            # scheduler.step(valid_loss)

            pbar.set_postfix(valid_loss = f'{epoch_valid_loss:.4f}')
    return epoch_valid_loss

Define the test process

In [None]:
def testing(model: torch.nn.Module, 
         test_loader: DataLoader, 
         criterion: nn.Module, 
         device: torch.device):
    """
    Train the model for one epoch.

    Parameters:
    - model: The model to test.
    - test_loader: The data loader for testing data.
    - criterion: The loss function.
    - device: The device to test on (e.g., 'cuda').

    Returns:
    - The average validating loss for the epoch.
    """
    model.load_state_dict(torch.load('C:/Users/ra78lof/occinference/ocr_model.pt'))

    model.eval()
    test_loss = 0

    pbar = tqdm(enumerate(test_loader), total=len(test_loader), desc='Test')
    with torch.no_grad():
        for _, (data, target) in enumerate(test_loader):
            data = data.to(device)
            target = target.to(device)
            output = model(data)

            loss = criterion(output.view(-1, output.size(2)), target.view(-1))
            test_loss += loss.item()
            epoch_test_loss = test_loss / len(test_loader.dataset)

            pbar.set_postfix(test_loss = f'{epoch_test_loss:.4f}')
    return epoch_test_loss

Save the trained model

In [None]:
def save_model(model: torch.nn.Module, 
               filename: str, 
               is_best: bool = False):
    """
    Save the model's state dict to a file.
    """
    torch.save(model.state_dict(), filename)
    if is_best:
        wandb.save(filename)

Define the whole training, validating and test process

In [None]:
def run_training(model: torch.nn.Module, 
                 train_loader: DataLoader,
                 valid_loader: DataLoader,
                 test_loader: DataLoader,
                 optimizer: torch.optim.Optimizer, 
                 scheduler: torch.optim.lr_scheduler,
                 criterion: nn.Module,
                 device: torch.device, 
                 num_epochs: int):
    """
    Train the model for a specified number of epochs and log metrics to wandb.

    Parameters:
    - model: The model to train.
    - optimizer: The optimizer to use.
    - scheduler: The learning rate scheduler.
    - device: The device to train on (e.g., 'cuda').
    - num_epochs: The number of epochs to train.

    Returns:
    - Tuple containing the trained model and training history.
    """
    wandb.watch(model, log_freq=100)
    
    # ema = EMA(model)
    
    # Test if CUDA is available
    if torch.cuda.is_available():
        print(f"cuda: {torch.cuda.get_device_name()}\n")
     
    start_time = time.time()
    best_model_weights = copy.deepcopy(model.state_dict())
    best_valid = -np.inf
    best_epoch = -1
    history = defaultdict(list)
    
    model = model.to(device)
    criterion = criterion.to(device)

    run = wandb.init(
        project='OCR_Recognition',
        config={k: v for k, v in dict(vars(CFG)).items() if '__' not in k},
        anonymous=CFG.anonymous,
        name=f"dim-{CFG.image_size[0]}x{CFG.image_size[1]}|model-{CFG.backend_model}",
        #group=CFG.comment,
        force = CFG.force,
        notes = CFG.notes,
        save_code = CFG.save_code
    )
    

    for epoch in range(1, num_epochs + 1):
        print(f'Epoch {epoch}/{num_epochs}', end='')

        train_loss = training(model, train_loader, optimizer, scheduler, criterion, device)
        val_loss= validating(model, valid_loader, criterion, device)
        test_loss = testing(model, test_loader, criterion, device)

        history['Train Loss'].append(train_loss)
        history['Valid Loss'].append(val_loss)
        history['Test Loss'].append(test_loss)

        # Log the metrics
        wandb.log({"Train Loss": train_loss,
                   "Valid Loss": val_loss,
                   "Test Loss": test_loss,
                   "LR":scheduler.get_last_lr()[0]})

        # deep copy the model
        if val_loss <= best_valid:
            print(f"Valid Loss Improved ({best_valid:0.4f} ---> {val_loss:0.4f})")
            best_valid = val_loss
            run.summary["Best Valid"]  = best_valid
        
        if test_loss <= best_test:
            print(f"Test Loss Improved ({best_test:0.4f} ---> {test_loss:0.4f})")
            best_test = test_loss
            best_epoch = epoch
            run.summary["Best Test"] = best_test   
            run.summary["Best Epoch"] = best_epoch
            best_model_weights = copy.deepcopy(model.state_dict())
            PATH = f"best_epoch.pt"
            torch.save(model.state_dict(), PATH)
            # Save a model file from the current directory
            wandb.save(PATH)
            print(f"Model Saved")

        last_epoch_model = copy.deepcopy(model.state_dict())
        PATH = f"last_epoch.pt"
        torch.save(last_epoch_model, PATH)
        # wandb.log({"Last Epoch Model": wandb.Artifact("last_epoch_model", type="model", description="last_epoch_model")})
        # torch.save(model.state_dict(), PATH)
        wandb.save(PATH)

        print(); print()


    elapsed_time = time.time() - start_time
    hours, remainder = divmod(elapsed_time, 3600)
    minutes, seconds = divmod(remainder, 60)
    print(f'Training complete in {hours:.0f}h {minutes:.0f}m {seconds:.0f}s')
    print(f"Best Test: {best_test:.4f}")

    wandb.log({"Training Time": elapsed_time})

    model.load_state_dict(best_model_weights)

    return model, history

Everything is setted up, let's go!

In [None]:
original_resnet = models.resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)
original_resnet.conv1 = nn.Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)

model = ModifiedResNet(original_resnet).to(device)

model, history = run_training(model, train_loader, valid_loader, test_loader, optimizer, scheduler, criterion, device, epoch)   
# A finetuned model is saved in the current directory   

In [None]:
#Enumerate the test dataset to get the predictions
label_list = []
pred_list = []

# Load our trained model
model.load_state_dict(torch.load('C:/Users/ra78lof/HardOCR/ocr_model_10.25_finetune_4.pt'))

for _, (data, target) in enumerate(test_loader):
    data = data.to(device)
    target = target.to(device)
    with torch.no_grad():

        output = model(data)
        # Get the index of the class with the highest probability score
        pred = output.softmax(dim=2)
        pred = torch.argmax(pred, dim=2)
        label_list += tokenizer.batch_decode(target, skip_special_tokens=True)
        pred_list += tokenizer.batch_decode(pred, skip_special_tokens=True)

In [None]:
# Decode one batch of the test dataset to get the predictions
# This is just for debugging purposes  
data, target = next(iter(test_loader))
data = data.to(device)
target = target.to(device)
with torch.no_grad():
    output = model(data)
pred = output.softmax(dim=2)
pred = torch.argmax(pred, dim=2)

# Decode the predictions and labels
label = tokenizer.batch_decode(target, skip_special_tokens=True)
pred = tokenizer.batch_decode(pred, skip_special_tokens=True)

# Print the predictions and labels
print(f'Label: {label}')
print(f'Prediction: {pred}')

In [None]:
# Store the predictions in an excel file

pd.DataFrame({'label': label_list, 'pred': pred_list}).to_excel('C:/Users/ra78lof/occinference/ocr_predictions_10.22_10.xlsx', index=False)