In [15]:
# Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from PIL import Image
import json
import os
from pathlib import Path
import numpy as np
import editdistance
from tqdm import tqdm

In [16]:
# CRNN Model with ResNet18 Pretrained for OCR (https://postimg.cc/kDsRSZbg)
class CRNN(nn.Module):
    def __init__(self, num_classes, hidden_size=512, pretrained=True):
        super(CRNN, self).__init__()
        # Khởi tạo EfficientNet-B5 làm backbone
        # Tải trọng số ImageNet đã huấn luyện trước nếu pretrained=True
        efficientnet = models.efficientnet_b5(weights=models.EfficientNet_B5_Weights.IMAGENET1K_V1 if pretrained else None)
        self.cnn = efficientnet.features

        # feature_dim cho EfficientNet-B5 là 2048 kênh
        self.feature_dim = 2048

        # Lớp AdaptiveAvgPool2d để thay đổi kích thước đầu ra CNN thành chiều cao 1
        self.adaptive_pool = nn.AdaptiveAvgPool2d((1, None))

        # Lớp LSTM để xử lý chuỗi đặc trưng
        self.rnn = nn.LSTM(
            self.feature_dim,      # Kích thước đầu vào (số kênh đặc trưng)
            hidden_size,           # Kích thước trạng thái ẩn
            bidirectional=True,    # Xử lý chuỗi theo cả hai hướng
            num_layers=2,          # Số lớp LSTM
            batch_first=True,      # Định dạng đầu vào/đầu ra là (batch, sequence, feature)
            dropout=0.3
        )
        # Lớp tuyến tính cuối cùng để dự đoán các lớp (ký tự)
        # hidden_size * 2 vì RNN là song hướng
        self.fc = nn.Linear(hidden_size * 2, num_classes)

    def forward(self, x):
        # CNN feature extraction
        conv = self.cnn(x)  # (batch, 512, H, W)

        # Adaptive pooling to height=1
        conv = self.adaptive_pool(conv)  # (batch, 512, 1, W)

        # Remove height dimension
        b, c, h, w = conv.size()
        conv = conv.squeeze(2)  # (batch, 512, W)

        # Permute for RNN: (batch, W, 512)
        conv = conv.permute(0, 2, 1)

        # RNN
        output, _ = self.rnn(conv)

        # FC
        output = self.fc(output)

        return output

In [17]:
# Character Encoder/Decoder
class CharacterEncoder:
    def __init__(self, labels_dict):
        # Get all unique characters from labels
        all_chars = set()
        for text in labels_dict.values():
            all_chars.update(text)

        # Sort to ensure consistent ordering
        chars = sorted(list(all_chars))

        # 0 reserved for CTC blank
        self.char_to_idx = {char: idx + 1 for idx, char in enumerate(chars)}
        self.char_to_idx['BLANK'] = 0
        self.idx_to_char = {idx: char for char, idx in self.char_to_idx.items()}

        print(f"Total unique characters: {len(chars)}")
        print(f"Characters: {repr(''.join(chars[:50]))}..." if len(chars) > 50 else f"Characters: {repr(''.join(chars))}")

    def encode(self, text):
        return [self.char_to_idx.get(char, 0) for char in text]

    def decode(self, indices):
        chars = []
        for idx in indices:
            if idx != 0 and idx in self.idx_to_char:
                chars.append(self.idx_to_char[idx])
        return ''.join(chars)

    def num_classes(self):
        return len(self.char_to_idx)

In [18]:
# Dataset class
class OCRDataset(Dataset):
    def __init__(self, json_path, img_dir, encoder, transform=None, img_height=32, img_width=256):
        with open(json_path, 'r', encoding='utf-8') as f:
            self.data = json.load(f)
        self.img_dir = Path(img_dir)
        self.transform = transform
        self.img_height = img_height
        self.img_width = img_width
        self.encoder = encoder
        self.image_paths = list(self.data.keys())

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_name = self.image_paths[idx]
        img_path = self.img_dir / img_name
        text = self.data[img_name]
        try:
            image = Image.open(img_path).convert('RGB')
        except Exception:
            image = Image.new('RGB', (self.img_width, self.img_height), color='white')
        if self.transform:
            image = self.transform(image)
        encoded_text = self.encoder.encode(text)
        return image, torch.LongTensor(encoded_text), text

In [19]:
# ...existing code...
from collections import defaultdict
import math

# CTC Decoder with Beam Search
def ctc_decode(predictions, encoder, beam_width=5):
    # Chuyển sang CPU nếu đang ở GPU
    if predictions.is_cuda:
        predictions = predictions.cpu()
    
    # Chuyển Logits -> Log Probabilities (để cộng log thay vì nhân xác suất)
    predictions = torch.nn.functional.log_softmax(predictions, dim=2).detach().numpy()
    
    decoded_batch = []
    
    for probs in predictions:
        beam = {(): (0.0, -float('inf'))}
        
        for t in range(len(probs)):
            next_beam = defaultdict(lambda: (-float('inf'), -float('inf')))
            top_k = min(beam_width, probs.shape[1])
            top_indices = np.argsort(probs[t])[-top_k:]
            
            for prefix, (p_b, p_nb) in beam.items():
                p_blank = probs[t][0]
                n_p_b, n_p_nb = next_beam[prefix]
                n_p_b = np.logaddexp(n_p_b, np.logaddexp(p_b, p_nb) + p_blank)
                next_beam[prefix] = (n_p_b, n_p_nb)

                for c in top_indices:
                    if c == 0: continue # Đã xử lý ở trên
                    p_char = probs[t][c]
                    if len(prefix) > 0 and prefix[-1] == c:
                        # Trường hợp lặp ký tự (ví dụ: "aa")
                        # a) Nếu trước đó là non-blank (cùng ký tự) -> Merge (không thêm ký tự mới)
                        n_p_b, n_p_nb = next_beam[prefix]
                        n_p_nb = np.logaddexp(n_p_nb, p_nb + p_char)
                        next_beam[prefix] = (n_p_b, n_p_nb)
                        
                        # b) Nếu trước đó là blank -> Extend (thêm ký tự mới)
                        new_prefix = prefix + (c,)
                        n_p_b, n_p_nb = next_beam[new_prefix]
                        n_p_nb = np.logaddexp(n_p_nb, p_b + p_char)
                        next_beam[new_prefix] = (n_p_b, n_p_nb)
                    else:
                        # Trường hợp ký tự mới
                        new_prefix = prefix + (c,)
                        n_p_b, n_p_nb = next_beam[new_prefix]
                        # Có thể nối từ cả blank và non-blank
                        n_p_nb = np.logaddexp(n_p_nb, np.logaddexp(p_b, p_nb) + p_char)
                        next_beam[new_prefix] = (n_p_b, n_p_nb)
            sorted_beam = sorted(
                next_beam.items(),
                key=lambda x: np.logaddexp(x[1][0], x[1][1]),
                reverse=True
            )
            beam = dict(sorted_beam[:beam_width])
        best_prefix = max(beam.items(), key=lambda x: np.logaddexp(x[1][0], x[1][1]))[0]
        decoded_batch.append(encoder.decode(best_prefix))
    return decoded_batch

In [20]:
from torch.cuda.amp import autocast, GradScaler

def train_epoch(model, dataloader, criterion, optimizer, device, encoder, epoch):
    if not hasattr(train_epoch, "scaler"):
        train_epoch.scaler = GradScaler()
    scaler = train_epoch.scaler

    model.train()
    total_loss = 0

    pbar = tqdm(dataloader, desc=f'Epoch {epoch} [Train]', ncols=100)

    for images, targets, target_texts in pbar:
        images = images.to(device)
        optimizer.zero_grad(set_to_none=True)

        with autocast():
            outputs = model(images)
            outputs = outputs.permute(1, 0, 2)

            input_lengths = torch.full(
                (outputs.size(1),),
                outputs.size(0),
                dtype=torch.long,
                device=device
            )
            target_lengths = torch.LongTensor([len(t) for t in targets]).to(device)
            targets_flat = torch.cat(targets).to(device)

            loss = criterion(
                outputs.log_softmax(2),
                targets_flat,
                input_lengths,
                target_lengths
            )

        scaler.scale(loss).backward()
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(model.parameters(), 5.0)
        scaler.step(optimizer)
        scaler.update()

        total_loss += loss.item()
        pbar.set_postfix({'loss': f'{loss.item():.4f}'})

    return total_loss / len(dataloader)


In [21]:
# Validation function
def validate(model, dataloader, criterion, device, encoder, dataset_name="Valid"):
    model.eval()
    total_loss = 0.0
    total_samples = 0
    correct_sent = 0
    total_edit_dist = 0.0

    pbar = tqdm(dataloader, desc=f'{dataset_name:>7}', ncols=100, leave=False)

    with torch.no_grad():
        for images, targets, target_texts in pbar:
            images = images.to(device)

            # Forward
            outputs = model(images)
            outputs_for_loss = outputs.permute(1, 0, 2)

            # Calculate CTC loss
            input_lengths = torch.full((outputs_for_loss.size(1),), outputs_for_loss.size(0), dtype=torch.long)
            target_lengths = torch.LongTensor([len(t) for t in targets])
            targets_flat = torch.cat(targets)
            loss = criterion(outputs_for_loss.log_softmax(2), targets_flat, input_lengths, target_lengths)
            total_loss += loss.item()

            # Decode
            predictions = ctc_decode(outputs, encoder)

            # Evaluate
            for pred, true in zip(predictions, target_texts):
                pred, true = pred.strip(), true.strip()
                total_samples += 1

                if pred == true:
                    correct_sent += 1

                total_edit_dist += editdistance.eval(pred, true)

            current_acc = 100 * correct_sent / total_samples if total_samples > 0 else 0
            pbar.set_postfix({'acc': f'{current_acc:.2f}%'})

    # Average metrics
    avg_loss = total_loss / len(dataloader)
    sent_acc = 100 * correct_sent / total_samples if total_samples > 0 else 0
    avg_lev_dist = total_edit_dist / total_samples if total_samples > 0 else 0

    return avg_loss, sent_acc, avg_lev_dist

In [None]:
CONFIG = {
    'train_json': 'dataset/dataset/train.json',
    'train_img_dir': 'dataset/dataset/train/images',
    'valid_json': 'dataset/dataset/valid.json',
    'valid_img_dir': 'dataset/dataset/valid/images',
    'img_height': 128,
    'img_width': 1024,
    'batch_size': 8,
    'num_epochs': 60,
    'learning_rate': 0.0002,
    'num_workers': 8,
    'save_dir': './output/',
    'use_pretrained': True,
}

os.makedirs(CONFIG['save_dir'], exist_ok=True)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Device: {device}\n')

with open(CONFIG['train_json'], 'r', encoding='utf-8') as f:
    train_labels = json.load(f)
encoder = CharacterEncoder(train_labels)

transform = transforms.Compose([
    transforms.Resize((CONFIG['img_height'], CONFIG['img_width'])),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
train_transform = transforms.Compose([
    transforms.Resize((CONFIG['img_height'], CONFIG['img_width'])),
    transforms.RandomAffine(degrees = 10, translate=(0.1, 0.1)),
    transforms.ColorJitter(0.1, 0.1, 0.1, 0.1),
    transforms.RandomGrayscale(p = 0.2),
    transforms.RandomAdjustSharpness(sharpness_factor=2, p=0.2),
    transforms.RandomAutocontrast(p=0.2),
    transforms.RandomApply([transforms.GaussianBlur(3)], p=0.2),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

val_transform = transforms.Compose([
    transforms.Resize((CONFIG['img_height'], CONFIG['img_width'])),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

train_dataset = OCRDataset(CONFIG['train_json'], CONFIG['train_img_dir'], encoder, train_transform)
valid_dataset = OCRDataset(CONFIG['valid_json'], CONFIG['valid_img_dir'], encoder, val_transform)
print(f"Train: {len(train_dataset)} | Valid: {len(valid_dataset)}\n")

collate_fn = lambda x: (
    torch.stack([item[0] for item in x]),
    [item[1] for item in x],
    [item[2] for item in x]
)

train_loader = DataLoader(train_dataset, batch_size=CONFIG['batch_size'], shuffle=True,
                          num_workers=CONFIG['num_workers'], collate_fn=collate_fn, pin_memory=True)
valid_loader = DataLoader(valid_dataset, batch_size=CONFIG['batch_size'], shuffle=False,
                          num_workers=CONFIG['num_workers'], collate_fn=collate_fn, pin_memory=True)

model = CRNN(encoder.num_classes(), hidden_size=256, pretrained=CONFIG['use_pretrained']).to(device)
criterion = nn.CTCLoss(blank=0, zero_infinity=True)
optimizer = optim.AdamW(model.parameters(), lr=CONFIG['learning_rate'], weight_decay=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)

best_valid_acc = 0
print("Starting training...\n")

for epoch in range(1, CONFIG['num_epochs'] + 1):
    train_loss = train_epoch(model, train_loader, criterion, optimizer, device, encoder, epoch)
    valid_loss, valid_acc, valid_lev = validate(model, valid_loader, criterion, device, encoder)
    scheduler.step(valid_loss)
    print(f"Epoch {epoch:2d} | Train Loss: {train_loss:.4f} | Valid Acc: {valid_acc:.2f}% | Valid Loss: {valid_loss:.4f}")
    if valid_acc > best_valid_acc:
        best_valid_acc = valid_acc
        save_path = os.path.join(CONFIG['save_dir'], 'best_model.pth')
        #torch.save(model.state_dict(), save_path)
        print(f"✅ Saved best model (Acc: {best_valid_acc:.2f}%)")

print("\nTraining complete!")
print(f"Best Validation Accuracy: {best_valid_acc:.2f}%")

Device: cuda

Total unique characters: 117
Characters: ' !"&\'(),-./0123456789:;?abcdefghijklmnopqrstuvwxyz'...
Train: 12225 | Valid: 1897



Downloading: "https://download.pytorch.org/models/efficientnet_b5_lukemelas-1a07897c.pth" to C:\Users\Admin/.cache\torch\hub\checkpoints\efficientnet_b5_lukemelas-1a07897c.pth
100%|██████████| 117M/117M [00:20<00:00, 5.97MB/s] 
  train_epoch.scaler = GradScaler()


Starting training...



Epoch 1 [Train]:   0%|                                                     | 0/1529 [00:00<?, ?it/s]

In [None]:
CONFIG = {
    'train_json': 'dataset/dataset/train.json',
    'train_img_dir': 'dataset/dataset/train/images',
    'valid_json': 'dataset/dataset/valid.json',
    'valid_img_dir': 'dataset/dataset/valid/images',
    'img_height': 128,
    'img_width': 1024,
    'batch_size': 8,
    'num_epochs': 60,
    'learning_rate': 0.00001,
    'num_workers': 8,
    'save_dir': './output/',
    'use_pretrained': True,
}

model = CRNN(encoder.num_classes(), hidden_size=256, pretrained=CONFIG['use_pretrained']).to(device)
# Load the best model and generate predictions
best_model_path = os.path.join(CONFIG['save_dir'], 'best_model.pth')
if os.path.exists(best_model_path):
    model.load_state_dict(torch.load(best_model_path, map_location=device))
    print(f"\nLoaded best model from: {best_model_path}")
criterion = nn.CTCLoss(blank=0, zero_infinity=True)
optimizer = optim.AdamW(model.parameters(), lr=CONFIG['learning_rate'], weight_decay=0.001)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)
best_valid_acc = 0
print("Starting training...\n")

for epoch in range(1, CONFIG['num_epochs'] + 1):
    train_loss = train_epoch(model, train_loader, criterion, optimizer, device, encoder, epoch)
    valid_loss, valid_acc, valid_lev = validate(model, valid_loader, criterion, device, encoder)
    scheduler.step(valid_loss)
    print(f"Epoch {epoch:2d} | Train Loss: {train_loss:.4f} | Valid Acc: {valid_acc:.2f}% | Valid Loss: {valid_loss:.4f}")
    if valid_acc > best_valid_acc:
        best_valid_acc = valid_acc
        save_path = os.path.join(CONFIG['save_dir'], 'best_model.pth')
        torch.save(model.state_dict(), save_path)
        print(f"✅ Saved best model (Acc: {best_valid_acc:.2f}%)")

print("\nTraining complete!")
print(f"Best Validation Accuracy: {best_valid_acc:.2f}%")

NameError: name 'encoder' is not defined

In [None]:
def predict_and_save(model, encoder, img_dir, output_json, device, img_height=32, img_width=1024):
    model.eval()
    predictions = {}

    # Define the same image transformation used during training
    transform = transforms.Compose([
        transforms.Resize((img_height, img_width)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ])

    img_names = sorted(os.listdir(img_dir))
    print(f"Predicting {len(img_names)} images from: {img_dir}")

    with torch.no_grad():
        for img_name in tqdm(img_names):
            img_path = os.path.join(img_dir, img_name)

            # Load and preprocess image
            try:
                image = Image.open(img_path).convert('RGB')
            except Exception as e:
                print(f"Error loading image {img_path}: {e}")
                continue

            image = transform(image).unsqueeze(0).to(device)  # shape: (1, 3, H, W)

            # Forward pass
            output = model(image)  # shape: (1, W, num_classes)

            # Decode using CTC greedy decoding
            pred_text = ctc_decode(output, encoder)[0]

            # Store prediction
            predictions[img_name] = pred_text.strip()

    # Save results to JSON file
    os.makedirs(os.path.dirname(output_json), exist_ok=True)
    with open(output_json, 'w', encoding='utf-8') as f:
        json.dump(predictions, f, ensure_ascii=False, indent=2)

    print(f"Saved predictions to: {output_json}")

In [None]:
    # Load the best model and generate predictions
    best_model_path = os.path.join(CONFIG['save_dir'], 'best_model.pth')
    if os.path.exists(best_model_path):
        model.load_state_dict(torch.load(best_model_path, map_location=device))
        print(f"\nLoaded best model from: {best_model_path}")

        # Predict on public test set
        predict_and_save(
            model, encoder,
            img_dir='dataset/public_test/images',
            output_json='./public_test.json',
            device=device,
            img_height=CONFIG['img_height'],
            img_width=CONFIG['img_width']
        )

        # Predict on private test set
        predict_and_save(
            model, encoder,
            img_dir='dataset/private_test/images',
            output_json='./private_test.json',
            device=device,
            img_height=CONFIG['img_height'],
            img_width=CONFIG['img_width']
        )
    else:
        print("Best model not found. Skipping prediction.")