---------------

# **<font style="color:Black">Create OCR by PyTorch</font>**
-------------------
-----------------

## **<font style="color:blue">Installation and import libraries</font>**
-------------------

In [1]:
!pip install  pillow 
!pip install opencv-python
!pip install matplotlib



In [2]:
import os
import sys
import shutil
import random
from PIL import Image, ImageDraw, ImageFont, ImageEnhance, ImageFilter
import numpy as np
import cv2

import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F  # Add this import
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms

## **<font style="color:blue">Hyperparameters</font>**
-------------------

In [3]:
OUTPUT_DIR = os.path.join('/kaggle','working','synthetic_data','images')
MODEL_DIR = os.path.join('/kaggle','working','model_dir')
LABELS_FILE = os.path.join('/kaggle','working','synthetic_data','labels.txt')
NUM_SAMPLES = 5120  # Number of images generated
IMG_WIDTH = 128
IMG_HEIGHT = 32
BATCH_SIZE = 32
EPOCHS = 10
LEARNING_RATE = 1e-5  # Lowered from 1e-4
WEIGHT_DECAY = 1e-4  # Increased from 1e-4
WARMUP_STEPS = 1000  # Reduced to ~1 epoch (160 batches)
ENTROPY_WEIGHT = 2.0
TEMPERATURE = 0.2     # New: sharpen softmax
BEAM_WIDTH = 10
CHARSET = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789-*"  # Group characters
MAX_TEXT_LENGTH = 10  # Maximum length of text in an image
FONT_DIR = os.path.join('/kaggle','input','google-fonts','GoogleFontScripts') # Folder with TrueType fonts (.ttf)
BACKGROUND_DIR = os.path.join('/kaggle','working','backgrounds')  # New folder for background (optional)
NUMBER_BACKGROUND_IMAGE = 50

## **<font style="color:blue">Utils support functions</font>**
-------------------

### **<font style="color:green">Create output folders</font>**

In [4]:
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(BACKGROUND_DIR, exist_ok=True)

# Creates a new file
with open(LABELS_FILE, 'w') as fp:
    pass

### **<font style="color:green">Load the font list (add the paths to the .ttf files to the "fonts" folder)</font>**

In [5]:
font_files = [
    os.path.join(FONT_DIR, f) for f in os.listdir(FONT_DIR) 
    if f.endswith('.ttf') and os.path.isfile(os.path.join(FONT_DIR, f))
]
if not font_files:
    raise FileNotFoundError("No fonts found in 'fonts' folder. Add .ttf files!")

### **<font style="color:green">Generating a simple gradient background</font>**

In [6]:
def generate_gradient_background(filename, size=(128, 32)):
    img = Image.new('L', size, color=230)  # Lighter gray as a base
    draw = ImageDraw.Draw(img)
    for y in range(size[1]):
        # Soft gradient with low contrast
        color = int(230 - 20 * (y / size[1]))  # From light gray to slightly darker
        draw.line([(0, y), (size[0], y)], fill=color)
    # Background blur
    img = img.filter(ImageFilter.GaussianBlur(radius=2))
    img.save(os.path.join(BACKGROUND_DIR, filename))

### **<font style="color:green">Generate a background with noise (paper texture)</font>**

In [7]:
def generate_paper_texture(filename, size=(128, 32)):
    img = Image.new('L', size, color=220)  # Lighter gray
    noise = np.random.normal(0, 5, size).astype(np.uint8)  # Less noise
    noise_img = Image.fromarray(noise)
    img.paste(noise_img, (0, 0), noise_img)
    # Blur for a softer effect
    img = img.filter(ImageFilter.GaussianBlur(radius=1.5))
    img.save(os.path.join(BACKGROUND_DIR, filename))

### **<font style="color:green">Creating multiple backgrounds</font>**

In [8]:
for i in range(NUMBER_BACKGROUND_IMAGE):
    generate_gradient_background(f"gradient_{i}.png")
    generate_paper_texture(f"paper_{i}.png")

### **<font style="color:green">Load backgrounds (optional, add images to the "backgrounds" folder)</font>**

In [9]:
background_files = (
    [os.path.join(BACKGROUND_DIR, f) for f in os.listdir(BACKGROUND_DIR) 
     if f.endswith(('.png', '.jpg', '.jpeg'))] if os.path.exists(BACKGROUND_DIR) else []
)

### **<font style="color:green">Random text generation function</font>**

In [10]:
def generate_random_text(max_length):
    length = random.randint(1, max_length)
    return ''.join(random.choice(CHARSET) for _ in range(length))

### **<font style="color:green">Functions for adding noise and distortions</font>**

In [11]:
def add_noise_and_distortion(img):
    img_array = np.array(img)
    # Mild Gaussian noise with lower intensity
    if random.random() > 0.5:  # 50% šance
        noise = np.random.normal(0, random.randint(5, 15), img_array.shape).astype(np.uint8)
        img_array = cv2.add(img_array, noise)
    # Subtle perspective distortion
    rows, cols = img_array.shape
    src_points = np.float32([[0, 0], [cols-1, 0], [0, rows-1], [cols-1, rows-1]])
    dst_points = np.float32([
        [random.uniform(0, 3), random.uniform(0, 3)],
        [cols-1-random.uniform(0, 3), random.uniform(0, 3)],
        [random.uniform(0, 3), rows-1-random.uniform(0, 3)],
        [cols-1-random.uniform(0, 3), rows-1-random.uniform(0, 3)]
    ])
    matrix = cv2.getPerspectiveTransform(src_points, dst_points)
    img_array = cv2.warpPerspective(img_array, matrix, (cols, rows))
    return Image.fromarray(img_array)

### **<font style="color:green">Improved image generation feature</font>**

In [12]:
def generate_synthetic_image(text, font_path, img_size=(IMG_WIDTH, IMG_HEIGHT)):
    # Pozadí
    if background_files:
        bg_path = random.choice(background_files)
        img = Image.open(bg_path).convert('L').resize(img_size)
    else:
        img = Image.new('L', img_size, color=230)
        draw = ImageDraw.Draw(img)
        for y in range(img_size[1]):
            color = int(230 - 20 * (y / img_size[1]))
            draw.line([(0, y), (img_size[0], y)], fill=color)
        img = img.filter(ImageFilter.GaussianBlur(radius=2))

    draw = ImageDraw.Draw(img)

    # Iterativní úprava fontu a textu
    font_size = random.randint(20, min(IMG_HEIGHT-2, 28))
    max_attempts = 5  # Omezení počtu pokusů
    for attempt in range(max_attempts):
        font = ImageFont.truetype(font_path, font_size)
        text_bbox = draw.textbbox((0, 0), text, font=font)
        text_width, text_height = text_bbox[2] - text_bbox[0], text_bbox[3] - text_bbox[1]

        if text_width <= IMG_WIDTH - 10:  # Text vejde
            break
        elif len(text) > 1:  # Zkrať text, pokud je příliš dlouhý
            text = text[:len(text)//2]
        else:  # Sniž velikost fontu
            font_size = max(10, font_size - 5)  # Minimální velikost 10

    # Pokud se nepodaří, použij minimální font a jednopísmený text
    if text_width > IMG_WIDTH - 10:
        text = text[0]  # Použij první písmeno
        font_size = 10
        font = ImageFont.truetype(font_path, font_size)
        text_bbox = draw.textbbox((0, 0), text, font=font)
        text_width, text_height = text_bbox[2] - text_bbox[0], text_bbox[3] - text_bbox[1]

    # Pozice textu
    x = random.randint(5, max(5, IMG_WIDTH - text_width - 5))
    y = random.randint(5, max(5, IMG_HEIGHT - text_height - 5))

    # Zvýraznění textu
    text_color = random.randint(0, 50)
    outline_color = 200
    for offset_x in [-1, 0, 1]:
        for offset_y in [-1, 0, 1]:
            if offset_x != 0 or offset_y != 0:
                draw.text((x + offset_x, y + offset_y), text, font=font, fill=outline_color)
    draw.text((x, y), text, font=font, fill=text_color)

    # Šum a deformace
    img = add_noise_and_distortion(img)
    return img

### **<font style="color:green">Function for splitting labels</font>**

In [13]:
def split_labels(labels, label_lengths):
    """Split a flat tensor of labels into a list of label sequences based on lengths."""
    split_labels = []
    start = 0
    for length in label_lengths:
        split_labels.append(labels[start:start + length])
        start += length
    return split_labels

### **<font style="color:green">Function of Beam search decoding</font>**

In [14]:
def beam_search_decode(output, idx_to_char, beam_width=BEAM_WIDTH):
    probs = output.softmax(2).cpu().numpy()  # [T, B, C]
    T, B, C = probs.shape
    predictions = []
    
    for b in range(B):
        sequence_probs = [(0.0, [], 1.0)]  # (log_prob, sequence, prob)
        for t in range(T):
            new_sequences = []
            for log_prob, seq, prob in sequence_probs:
                top_k_probs, top_k_idx = torch.topk(torch.tensor(probs[t, b]), beam_width)
                for k_prob, k_idx in zip(top_k_probs, top_k_idx):
                    new_seq = seq + [k_idx.item()]
                    new_prob = prob * k_prob.item()
                    new_log_prob = log_prob + np.log(k_prob.item())
                    new_sequences.append((new_log_prob, new_seq, new_prob))
            sequence_probs = sorted(new_sequences, key=lambda x: x[0], reverse=True)[:beam_width]
        
        # Collapse CTC repeats and remove blanks
        best_seq = sequence_probs[0][1]  # Highest log_prob sequence
        decoded = []
        prev = -1
        for idx in best_seq:
            if idx != 0 and idx != prev:  # Skip blanks (0) and repeats
                decoded.append(idx_to_char.get(idx, ''))
            prev = idx
        predictions.append(''.join(decoded) if decoded else '<empty>')
    
    return predictions

### **<font style="color:green">Custom collate function</font>**

In [15]:
def custom_collate_fn(batch):
    images, labels, label_lengths = zip(*batch)
    # Stack images (all same size)
    images = torch.stack(images, dim=0)
    # Concatenate labels into a flat tensor
    labels = torch.cat(labels, dim=0)
    # Convert label_lengths to tensor
    label_lengths = torch.tensor(label_lengths, dtype=torch.long)
    return images, labels, label_lengths

### **<font style="color:green">Generování datasetu</font>**

In [16]:
def create_synthetic_dataset(num_samples):
    labels = []
    for i in range(num_samples):
        text = generate_random_text(MAX_TEXT_LENGTH)
        if not text:
            continue
        font_path = random.choice(font_files)
        img = generate_synthetic_image(text, font_path)
        img_name = f"img_{i:05d}.png"
        img_path = os.path.join(OUTPUT_DIR, img_name)
        img.save(img_path)
        labels.append(f"{img_name}\t{text}")  # Use tab (\t) instead of space
        if i % 100 == 0:
            print(f"Generated {i}/{num_samples} images")

    if labels:
        with open(LABELS_FILE, 'w') as f:
            f.write("\n".join(labels))
        print(f"Dataset generated! Images saved in '{OUTPUT_DIR}', labels in '{LABELS_FILE}'")
    else:
        print("No labels generated!")

## **<font style="color:blue">Mapping characters to indices and back</font>**
-------------------

In [17]:
char_to_idx = {char: idx + 1 for idx, char in enumerate(CHARSET)}  # 0 is reserved for blank (CTC)
idx_to_char = {idx: char for char, idx in char_to_idx.items()}

## **<font style="color:blue">Custom dataset</font>**
-------------------

In [18]:
class OCRDataset(Dataset):
    def __init__(self, image_dir, labels_file):
        self.image_dir = image_dir
        self.labels_file = labels_file
        self.data = []
        with open(labels_file, 'r') as f:
            for line in f:
                if not line.strip():  # Skip empty lines
                    continue
                image_path, label = line.strip().split('\t')
                label_length = len(label)
                self.data.append((image_path, label, label_length))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image_path, label, label_length = self.data[idx]
        image = Image.open(os.path.join(self.image_dir, image_path)).convert('L')
        image = transforms.ToTensor()(image)
        label_encoded = torch.tensor([char_to_idx[c] for c in label], dtype=torch.long)
        return image, label_encoded, label_length

## **<font style="color:blue">Custom CTC Loss with Blank Penalty</font>**
-------------------
> CTC Loss with Entropy Regularization

In [19]:
class CTCLossWithBlankPenalty(nn.Module):
    def __init__(self, blank=0, zero_infinity=True, blank_penalty_weight=0.5, entropy_weight=0.5, label_smoothing=0.1):
        super(CTCLossWithBlankPenalty, self).__init__()
        self.blank = blank
        self.zero_infinity = zero_infinity
        self.blank_penalty_weight = blank_penalty_weight
        self.entropy_weight = entropy_weight
        self.label_smoothing = label_smoothing

    def forward(self, log_probs, targets, input_lengths, target_lengths):
        ctc_loss = F.ctc_loss(
            log_probs,
            targets,
            input_lengths,
            target_lengths,
            blank=self.blank,
            reduction='mean',
            zero_infinity=self.zero_infinity
        )
        blank_probs = log_probs[:, :, self.blank].exp()
        avg_blank_prob = blank_probs.mean(dim=0).mean()
        probs = log_probs.exp()
        entropy = -torch.sum(probs * log_probs, dim=2).mean()
        total_loss = ctc_loss + self.blank_penalty_weight * avg_blank_prob - self.entropy_weight * entropy
        return total_loss

## **<font style="color:blue">Model definition (CNN + RNN + CTC)</font>**
-------------------

In [20]:
class OCRModel(nn.Module):
    def __init__(self, num_chars):
        super(OCRModel, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),  # 32x128 -> 16x64
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),  # 16x64 -> 8x32
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Dropout(0.5)
        )
        self.linear = nn.Linear(256 * (IMG_HEIGHT // 4), 512)
        self.rnn = nn.LSTM(
            input_size=512,
            hidden_size=256,  # Increased capacity
            num_layers=3,     # Deeper RNN
            bidirectional=True,
            batch_first=True,
            dropout=0.5
        )
        self.fc = nn.Linear(256 * 2, num_chars + 1)

    def forward(self, x):
        x = self.cnn(x)
        x = x.permute(0, 3, 1, 2)
        x = x.reshape(x.size(0), x.size(1), -1)
        x = self.linear(x)
        x = self.rnn(x)[0]
        x = self.fc(x)
        x = x[:, :MAX_TEXT_LENGTH, :]
        x = x / TEMPERATURE  # Apply temperature scaling
        x = x.permute(1, 0, 2)
        return x

## **<font style="color:blue">Training</font>**
-------------------

In [21]:
def train_model(model, train_loader, criterion, optimizer, device, epoch, warmup_steps=WARMUP_STEPS):
    model.train()
    total_loss = 0
    global_step = epoch * len(train_loader)
    for batch_idx, (imgs, labels, label_lengths) in enumerate(train_loader):
        imgs, labels = imgs.to(device), labels.to(device)
        label_lengths = label_lengths.to(device)

        if global_step < warmup_steps:
            lr_scale = min(1.0, float(global_step + 1) / warmup_steps)
            for param_group in optimizer.param_groups:
                param_group['lr'] = LEARNING_RATE * lr_scale

        optimizer.zero_grad()
        outputs = model(imgs)
        outputs = outputs.log_softmax(2)

        batch_size = imgs.size(0)
        seq_length = outputs.size(0)
        input_lengths = torch.full((batch_size,), seq_length, dtype=torch.long).to(device)

        loss = criterion(outputs, labels, input_lengths, label_lengths)
        if torch.isnan(loss) or torch.isinf(loss):
            print(f"Warning: NaN or Inf loss at batch {batch_idx}. Skipping...")
            continue

        loss.backward()
        grad_norm = torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=2.0)  # Tightened from 5.0
        optimizer.step()
        total_loss += loss.item()
        global_step += 1

        if batch_idx % 10 == 0:
            with torch.no_grad():
                pred_texts = beam_search_decode(outputs, idx_to_char)  # Switch to beam search
                raw_outputs = outputs.argmax(2).cpu().numpy()[:3]
                blank_probs = outputs[:, :, 0].exp().mean(dim=0).mean().item()
                label_sequences = split_labels(labels, label_lengths)
                ground_truth = [''.join([idx_to_char.get(idx.item(), '') for idx in label_seq])
                                for label_seq in label_sequences[:3]]
                print(f"Batch {batch_idx}, Gradient norm: {grad_norm.item():.4f}")
                print(f"Epoch {epoch+1}, Batch {batch_idx}/{len(train_loader)}, Loss: {loss.item():.4f}")
                print(f"Avg Blank Probability: {blank_probs:.4f}")
                print(f"Sample predictions: {pred_texts[:3]}")
                print(f"Ground Truth (first 3): {ground_truth}")
                print(f"Raw outputs (first 3): {raw_outputs}")

    avg_loss = total_loss / len(train_loader)
    return avg_loss

## **<font style="color:blue">Inference (prediction)</font>**
-------------------

In [22]:
def decode_prediction(output, idx_to_char):
    probs = output.softmax(2)
    max_probs, preds = probs.max(dim=2)
    preds = preds.cpu().numpy()
    max_probs = max_probs.cpu().numpy()
    texts = []
    for i, (pred, prob) in enumerate(zip(preds.T, max_probs.T)):
        print(f"Raw prediction {i} (pre-filter): {pred}, Max probs: {prob}")
        # Dynamic threshold: 75th percentile of max probs in this sequence
        threshold = np.percentile(prob, 75)
        print(f"Dynamic threshold for prediction {i}: {threshold:.4f}")
        pred_text = []
        prev = -1
        for idx, p in zip(pred, prob):
            if idx != 0 and idx != prev and p > threshold:
                pred_text.append(idx_to_char.get(idx, ''))
            prev = idx
        decoded = ''.join(pred_text)
        texts.append(decoded if decoded else '<empty>')
    return texts

## **<font style="color:blue">Main launch</font>**
-------------------

In [23]:
if __name__ == "__main__":
    if not font_files:
        print("Download some TrueType fonts (.ttf) and place them in the 'fonts' folder!")
    else:
        create_synthetic_dataset(NUM_SAMPLES)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    full_dataset = OCRDataset(image_dir=OUTPUT_DIR, labels_file=LABELS_FILE)
    if len(full_dataset) == 0:
        print("Dataset is empty! Check labels.txt or image directory.")
    else:
        # Curriculum phases with pre-filtering
        model = OCRModel(num_chars=len(CHARSET)).to(device)
        criterion = CTCLossWithBlankPenalty(blank=0, zero_infinity=True, blank_penalty_weight=0.5, entropy_weight=ENTROPY_WEIGHT, label_smoothing=0.1)
        optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
        scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS, eta_min=1e-6)

        best_loss = float('inf')
        for epoch in range(EPOCHS):
            # Filter full dataset based on curriculum phase
            if epoch < 15:
                filtered_data = [(img, lbl, lbl_len) for img, lbl, lbl_len in full_dataset.data if lbl_len <= 5]
            elif epoch < 20:
                filtered_data = [(img, lbl, lbl_len) for img, lbl, lbl_len in full_dataset.data if lbl_len <= 7]
            else:
                filtered_data = full_dataset.data

            # Create a new dataset with filtered data
            curr_dataset = OCRDataset(image_dir=OUTPUT_DIR, labels_file=LABELS_FILE)
            curr_dataset.data = filtered_data  # Overwrite with filtered data

            # Split into train and validation
            train_size = int(0.8 * len(curr_dataset))
            val_size = len(curr_dataset) - train_size
            train_dataset, val_dataset = torch.utils.data.random_split(curr_dataset, [train_size, val_size])

            # Create data loaders
            train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=custom_collate_fn)
            val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=custom_collate_fn)
            
            loss = train_model(model, train_loader, criterion, optimizer, device, epoch)
            print(f"Epoch {epoch+1}/{EPOCHS}, Loss: {loss:.4f}")

            # Validation
            model.eval()
            val_loss = 0
            with torch.no_grad():
                for imgs, labels, label_lengths in val_loader:
                    imgs, labels = imgs.to(device), labels.to(device)
                    label_lengths = label_lengths.to(device)
                    outputs = model(imgs)
                    outputs = outputs.log_softmax(2)
                    seq_length = outputs.size(0)
                    input_lengths = torch.full((imgs.size(0),), seq_length, dtype=torch.long).to(device)
                    val_loss += criterion(outputs, labels, input_lengths, label_lengths).item()
                
                val_loss /= len(val_loader)
                pred_texts = beam_search_decode(outputs, idx_to_char)
                label_sequences = split_labels(labels, label_lengths)
                ground_truth = [''.join([idx_to_char.get(idx.item(), '') for idx in label_seq])
                                for label_seq in label_sequences[:5]]
                print(f"Validation Loss: {val_loss:.4f}")
                print("Validation Predictions:", pred_texts[:5])
                print("Ground Truth:", ground_truth)

            model.train()
            scheduler.step()
            print(f"Current Learning Rate: {optimizer.param_groups[0]['lr']}")

            if val_loss < best_loss:
                best_loss = val_loss
                torch.save(model.state_dict(), os.path.join(MODEL_DIR, 'best_ocr_model.pth'))

        torch.save(model.state_dict(), os.path.join(MODEL_DIR, 'final_ocr_model.pth'))

Generated 0/5120 images
Generated 100/5120 images
Generated 200/5120 images
Generated 300/5120 images
Generated 400/5120 images
Generated 500/5120 images
Generated 600/5120 images
Generated 700/5120 images
Generated 800/5120 images
Generated 900/5120 images
Generated 1000/5120 images
Generated 1100/5120 images
Generated 1200/5120 images
Generated 1300/5120 images
Generated 1400/5120 images
Generated 1500/5120 images
Generated 1600/5120 images
Generated 1700/5120 images
Generated 1800/5120 images
Generated 1900/5120 images
Generated 2000/5120 images
Generated 2100/5120 images
Generated 2200/5120 images
Generated 2300/5120 images
Generated 2400/5120 images
Generated 2500/5120 images
Generated 2600/5120 images
Generated 2700/5120 images
Generated 2800/5120 images
Generated 2900/5120 images
Generated 3000/5120 images
Generated 3100/5120 images
Generated 3200/5120 images
Generated 3300/5120 images
Generated 3400/5120 images
Generated 3500/5120 images
Generated 3600/5120 images
Generated 370

In [24]:
def zip_folder_with_shutil(source_folder, output_path):
    '''Function for zip dir data'''
    shutil.make_archive(output_path, 'zip', source_folder)

In [25]:
zip_folder_with_shutil('/kaggle/working/backgrounds', '/kaggle/working/backgrounds')
zip_folder_with_shutil('/kaggle/working/synthetic_data', '/kaggle/working/synthetic_data')