In [None]:
import os
import numpy as np
import matplotlib.pyplot as plt
import json
from PIL import Image

import tensorflow as tf
import torchvision.transforms as transforms
from torchvision.transforms import functional as F
import torch
from tensorflow.keras.models import load_model
import torch.nn as nn
import torch.optim as optim
from keras import layers
from keras.models import Model
from torch.utils.data import Dataset, DataLoader

import albumentations as A
from albumentations.pytorch import ToTensorV2

  check_for_updates()


In [None]:
import os
import json
import random
import shutil

# Paths to the COCO dataset folders
images_folder = '/content/drive/MyDrive/Colab Notebooks/OCR/Dataset/train2017'  # Replace with the path to the train2017 folder
annotations_file = '/content/drive/MyDrive/Colab Notebooks/OCR/Dataset/annotations/captions_train2017.json'  # Replace with the path to the annotation file

# Create directories for the splits if they don't exist
train_folder = '/content/drive/MyDrive/Colab Notebooks/OCR/Dataset/train'
val_folder = '/content/drive/MyDrive/Colab Notebooks/OCR/Dataset/val'
test_folder = '/content/drive/MyDrive/Colab Notebooks/OCR/Dataset/test'

os.makedirs(train_folder, exist_ok=True)
os.makedirs(val_folder, exist_ok=True)
os.makedirs(test_folder, exist_ok=True)

# Load the COCO annotations JSON file
with open(annotations_file, 'r') as f:
    annotations_data = json.load(f)

# Get the list of image IDs and shuffle them for randomness
image_ids = [image['id'] for image in annotations_data['images']]
random.shuffle(image_ids)

# Split the image IDs into 3 sets (train, val, test)
train_ids = image_ids[:int(0.7 * len(image_ids))]  # 70% for training
val_ids = image_ids[int(0.7 * len(image_ids)):int(0.85 * len(image_ids))]  # 15% for validation
test_ids = image_ids[int(0.85 * len(image_ids)):]  # 15% for testing

# Helper function to copy images to the corresponding folders
def copy_images(ids, source_folder, target_folder):
    for img_id in ids:
        # Find the image file by ID
        img_file = next((img['file_name'] for img in annotations_data['images'] if img['id'] == img_id), None)
        if img_file:
            src_path = os.path.join(source_folder, img_file)
            dst_path = os.path.join(target_folder, img_file)
            shutil.copy(src_path, dst_path)

# Copy the images for each split
copy_images(train_ids, images_folder, train_folder)
copy_images(val_ids, images_folder, val_folder)
copy_images(test_ids, images_folder, test_folder)

# Create new annotation files for each split (train, val, test)
def create_split_annotation_file(ids, annotations_data, split_name):
    new_annotations = {
        'images': [img for img in annotations_data['images'] if img['id'] in ids],
        'annotations': [anno for anno in annotations_data['annotations'] if anno['image_id'] in ids],
        'categories': annotations_data['categories']
    }
    with open(f'{split_name}_annotations.json', 'w') as f:
        json.dump(new_annotations, f)

# Create annotation files for each split
create_split_annotation_file(train_ids, annotations_data, 'train')
create_split_annotation_file(val_ids, annotations_data, 'val')
create_split_annotation_file(test_ids, annotations_data, 'test')

print(f"COCO dataset has been split into {train_folder}, {val_folder}, and {test_folder}.")


KeyboardInterrupt: 

In [None]:
image_dir_train = "COCO-Text/images/train2014/"
image_dir_val = "COCO-Text/images/val2014/"
output_dir = "COCO-Text/preprocessed_images/"
annotations_path="/content/drive/MyDrive/Colab Notebooks/OCR/Dataset/annotations/captions_train2017.json"

# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)

In [None]:
# Load annotations
with open(annotations_path, "r") as f:
    annotations = json.load(f)

annotations = cocotext["anns"]
images = cocotext["imgs"]

# Filter legible english text
filtered_annotations = [
    {
        "image_id": ann["image_id"],
        "bbox": ann["bbox"],
        "text": ann["text"]
    }
    for ann in annotations["anns"].values()
    if ann["legibility"] == "legible" and ann["language"] == "english"
]

In [None]:
def normalize_bbox(bbox, img_width, img_height):
    x, y, w, h = bbox
    x_min = x / img_width
    y_min = y / img_height
    x_max = (x + w) / img_width
    y_max = (y + h) / img_height
    return [x_min, y_min, x_max, y_max]

In [None]:
# Mean and std normalization
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
desired_size = 224

for ann in filtered_annotations:
  image_id = ann["image_id"]
  image_path = os.path.join(image_dir, image_id + ".jpg")

  width, height = image.size
  bbox = filtered_annotations[0]['bbox']
  normalized_bbox = normalize_bbox(bbox, width, height)

  image = Image.open(image_path).convert("RGB")
  image = F.pad(image, padding=(0, 0, desired_size - image.size[0], desired_size - image.size[1]), fill=(0, 0, 0))
  image=image/255.0
  cropped_region = image.crop((x, y, x + w, y + h))
  output_path = os.path.join(output_dir, f"{image_id}.jpg")
  Image.fromarray((cropped_region * 255).astype(np.uint8)).save(output_path)

In [None]:
  image_array = np.array(image) / 255.0  # Normalize to [0, 1]
  image_array = (image_array - mean) / std

In [None]:
# Define augmentation pipeline
augmentations = A.Compose([
    A.RandomBrightnessContrast(p=0.2),
    A.Rotate(limit=15, p=0.5),
    A.Resize(224, 224),
    ToTensorV2()
])

# Apply augmentations
augmented = augmentations(image=np.array(image))
image_augmented = augmented["image"]


In [None]:
# CRNN Model Definition
class CRNN(nn.Module):
    def __init__(self, input_channels=1, hidden_units=256, num_classes=37):
        super(CRNN, self).__init__()

        # Convolutional Layers (CNN)
        self.conv1 = nn.Conv2d(input_channels, 64, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)

        # Maxpooling
        self.maxpool = nn.MaxPool2d(kernel_size=2, stride=2)

        # Recurrent Layer (LSTM)
        self.rnn = nn.LSTM(256, hidden_units, bidirectional=True, batch_first=True)

        # Fully connected layers (for output)
        self.fc = nn.Linear(hidden_units * 2, num_classes)

    def forward(self, x):
        # Apply convolutional layers
        x = self.maxpool(torch.relu(self.conv1(x)))
        x = self.maxpool(torch.relu(self.conv2(x)))
        x = self.maxpool(torch.relu(self.conv3(x)))

        # Prepare the data for LSTM (flatten the spatial dimensions)
        x = x.permute(0, 3, 2, 1).contiguous()  # [batch_size, width, height, channels]
        x = x.view(x.size(0), x.size(1), -1)  # Flatten the height and channels

        # Apply LSTM (bidirectional)
        x, _ = self.rnn(x)

        # Apply fully connected layer
        x = self.fc(x)

        return x

# Example: Instantiate the model
model = CRNN(input_channels=3, hidden_units=256, num_classes=37)  # num_classes depends on your dataset (e.g., 26 letters + 10 digits)


In [None]:
learning_rate=0.001

class OCRDataset(Dataset):
    def __init__(self, image_paths, labels, transform=None):
        self.image_paths = image_paths
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        # Load image
        image = Image.open(self.image_paths[idx]).convert('RGB')

        # Apply transformations (e.g., resizing, normalization)
        if self.transform:
            image = self.transform(image)

        # Get the label (text transcription)
        label = self.labels[idx]

        return image, label

# Example: Image transformations (convert to tensor, normalize)
transform = transforms.Compose([
    transforms.Resize((32, 128)),  # Resize images to a fixed size (height=32, width=128)
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # ImageNet normalization
])


In [None]:
#loss function (CTC Loss)
criterion = nn.CTCLoss()

#optimizer (Adam)
optimizer = optim.Adam(model.parameters(), learning_rate=0.001)


In [None]:
# Training loop
num_epochs = 10  # Number of training epochs
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0

    for images, labels in dataloader:
        # Move images and labels to GPU
        images = images.to(device)
        labels = labels.to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(images)

        # CTC Loss requires the target to be of shape [batch_size, seq_len], and lengths of the output sequences
        # Calculate input_lengths (for each image in batch)
        input_lengths = torch.full((images.size(0),), outputs.size(1), dtype=torch.long)

        # Calculate target_lengths (length of each text label)
        target_lengths = torch.IntTensor([len(label) for label in labels])

        # Compute CTC loss
        loss = criterion(outputs.log_softmax(2), labels, input_lengths, target_lengths)

        # Backward pass and optimization
        loss.backward()
        optimizer.step()

        # Print statistics
        running_loss += loss.item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss/len(dataloader)}")


In [None]:
model_path = '/content/drive/MyDrive/Colab Notebooks/OCR/model.keras'

In [None]:
model.save(model_path)

In [None]:
trained_model = load_model(model_path)

In [None]:
model.compile(loss=CTCloss(), optimizer=Adam(learning_rate),metrics=['accuracy'], metrics=[CWERMetric()])

In [None]:
def decode_ctc_output(output, characters='abcdefghijklmnopqrstuvwxyz0123456789'):
    """ Decodes CTC output into readable text """
    _, max_indices = torch.max(output, dim=2)  # Get max probability indices (each index corresponds to a character)
    decoded_text = []

    prev_char = -1
    for i in range(max_indices.size(1)):  # Loop over the sequence length
        char_idx = max_indices[0][i].item()
        if char_idx != prev_char:  # Avoid duplicate characters
            decoded_text.append(characters[char_idx])
        prev_char = char_idx

    return ''.join(decoded_text)

# Example inference
image = Image.open('path/to/cropped_image.jpg').convert('RGB')
image = transform(image).unsqueeze(0).cuda() if torch.cuda.is_available() else transform(image).unsqueeze(0)

# Predict
model.eval()
with torch.no_grad():
    output = model(image)
    decoded_text = decode_ctc_output(output)
    print(f"Predicted text: {decoded_text}")


In [None]:
def evaluate_model(model, dataloader, criterion, characters='abcdefghijklmnopqrstuvwxyz0123456789'):
    """Evaluates the model and computes the loss and character accuracy."""
    model.eval()  # Set the model to evaluation mode
    total_loss = 0.0
    total_samples = 0
    correct_characters = 0
    total_characters = 0

    with torch.no_grad():
        for images, labels in tqdm(dataloader, desc="Evaluating"):
            # Move data to GPU
            images = images.to(device)

            # Forward pass
            outputs = model(images)

            # Calculate input lengths (assume all outputs are of the same length)
            input_lengths = torch.full((images.size(0),), outputs.size(1), dtype=torch.long).to(device)

            # Convert labels to target format and compute target lengths
            target_texts = [label for label in labels]
            target_lengths = torch.IntTensor([len(label) for label in target_texts]).to(device)

            # Flatten the target texts into a single tensor for CTC loss
            all_targets = ''.join(target_texts)
            targets = torch.IntTensor([characters.index(c) for c in all_targets]).to(device)

            # Compute CTC Loss
            loss = criterion(outputs.log_softmax(2), targets, input_lengths, target_lengths)
            total_loss += loss.item()

            # Decode predictions
            predicted_texts = decode_ctc_output(outputs, characters)

            # Compute character-level accuracy
            for pred_text, true_text in zip(predicted_texts, target_texts):
                correct_characters += sum([1 for p, t in zip(pred_text, true_text) if p == t])
                total_characters += len(true_text)

            total_samples += len(target_texts)

    # Compute final metrics
    avg_loss = total_loss / len(dataloader)
    char_accuracy = correct_characters / total_characters * 100

    print(f"Validation Loss: {avg_loss:.4f}")
    print(f"Character Accuracy: {char_accuracy:.2f}%")

    return avg_loss, char_accuracy

# Define test dataset and dataloader
test_image_paths = ["/content/drive/MyDrive/path/to/test_image1.jpg", "/content/drive/MyDrive/path/to/test_image2.jpg"]
test_labels = ["text1", "text2"]  # Corresponding ground-truth labels for test images

test_dataset = OCRDataset(test_image_paths, test_labels, transform=transform)
test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# Define characters and criterion
characters = 'abcdefghijklmnopqrstuvwxyz0123456789'
criterion = nn.CTCLoss().to(device)

# Evaluate the model
evaluate_model(model, test_dataloader, criterion, characters)

In [None]:
# Printing the summary of model
print(trained_model.summary())