# Actual Starting of Pipeline

# Imports and PIP installs

In [1]:
import os
import numpy as np
import matplotlib.pyplot as plt
import cv2
from scipy.signal import find_peaks, savgol_filter
from scipy.ndimage import binary_closing
import secrets      # cryptographically‑secure RNG
import base64       # for compact ASCII/“number + letter” output
import uuid

# YOLO model prediction

In [2]:
# Input image
image_path = "D://Thesis//Final_Thesis_Pipeline//kaggle//input//tests//a01-132x.png"

In [4]:
from ultralytics import YOLO

model = YOLO("kaggle/input/weights/last.pt")

In [5]:
def seperate_handwritten_printed_using_yolo(image_path, folder_name):
    results = model([image_path])  
    image = cv2.imread(image_path)
    base_name = os.path.basename(image_path)

    base_crop_folder = "cropped_outputs"
    base_graph_folder = "graph_outputs"

    for result in results:
        boxes = result.boxes
        cls = boxes.cls.cpu().numpy()
        xyxy = boxes.xyxy.cpu().numpy()

        class9_boxes = [box for i, box in enumerate(xyxy) if cls[i] == 9]

        def ensure_and_save_crop(box, root_folder):
            x1, y1, x2, y2 = map(int, box)
            cropped = image[y1:y2, x1:x2]
            subfolder = os.path.join(folder_name, root_folder)
            os.makedirs(subfolder, exist_ok=True)
            filename = f"{uuid.uuid4().hex}.jpg"
            print(cv2.imwrite(os.path.join(subfolder, filename), cropped))

        # Save all class 9 cropped regions
        for box in class9_boxes:
            ensure_and_save_crop(box, base_crop_folder)

        # Save image with detection graph
        graph = result.plot()
        graph_folder = os.path.join(folder_name, base_graph_folder)
        os.makedirs(graph_folder, exist_ok=True)
        print(cv2.imwrite(os.path.join(graph_folder, base_name), graph))

In [6]:
seperate_handwritten_printed_using_yolo(image_path, "kaggle/output/")


0: 640x448 2 Page-headers, 1 Section-header, 2 Texts, 86.6ms
Speed: 5.0ms preprocess, 86.6ms inference, 3.0ms postprocess per image at shape (1, 3, 640, 448)
True
True
True


# Splitting lines from the detected image

In [9]:
# Update your base directories
base_input_dir = r"kaggle/output/cropped_outputs"
base_output_printed = r"kaggle/output/cropped_outputs_line"
base_graph_folder = r"kaggle/output/line_graphs"

In [10]:
import os
import numpy as np
import matplotlib.pyplot as plt
import cv2
from scipy.signal import find_peaks, savgol_filter
from scipy.ndimage import binary_closing

def auto_savgol_smooth(profile, polyorder=2, spacing_factor=None,
                       plot=True, plot_title="", save_path=None,
                       show_thresholds=False, high_thresh=None, low_thresh=None):
    
    peaks, _ = find_peaks(profile, distance=8)
    if len(peaks) < 2:
        raise ValueError("Not enough peaks detected to estimate line spacing.")

    diffs = np.diff(peaks)
    eps = 1e-9
    weights = 1.0 / (diffs + eps)
    avg_spacing = int(np.round(np.sum(weights * diffs) / np.sum(weights)))

    # Dynamically estimate spacing_factor if not provided
    if spacing_factor is None:
        spacing_factor = min(max(1.2, avg_spacing / 20), 2.0)

    window_length = int(spacing_factor * avg_spacing)
    if window_length % 2 == 0:
        window_length += 1
    window_length = max(window_length, polyorder + 4)
    window_length = min(window_length,
                        len(profile) - 1 if len(profile) % 2 else len(profile) - 2)

    smoothed = savgol_filter(profile, window_length=window_length, polyorder=polyorder)

    if plot:
        fig = plt.figure(figsize=(14, 5))
        plt.plot(profile, label="Original", color="orange", alpha=0.6)
        plt.plot(smoothed, label=f"Smoothed (window={window_length})", color="blue")
        plt.plot(peaks, profile[peaks], "rx", label="Detected Peaks")

        if show_thresholds:
            if high_thresh is not None:
                plt.axhline(y=high_thresh, color="red", linestyle="--", label=f"High Thresh = {high_thresh:.2f}")
            if low_thresh is not None:
                plt.axhline(y=low_thresh, color="green", linestyle="--", label=f"Low Thresh = {low_thresh:.2f}")

        plt.title(plot_title or "Savitzky-Golay smoothing")
        plt.xlabel("Row Index")
        plt.ylabel("Sum of Pixel Intensities")
        plt.legend()
        plt.grid(True)
        plt.tight_layout()
        if save_path is not None:
            fig.savefig(save_path)
        plt.close(fig)

    return smoothed, spacing_factor



def calculate_projection_profile_and_crop_lines_with_lines(image_path, folder_name):
    base_name = os.path.basename(image_path)
    image_name_no_ext = os.path.splitext(base_name)[0]

    subfolder_graph = os.path.join(base_graph_folder, folder_name)
    os.makedirs(subfolder_graph, exist_ok=True)
    output_path = os.path.join(subfolder_graph, f"{base_name}")

    image = cv2.imread(str(image_path), cv2.IMREAD_GRAYSCALE)
    if image is None:
        print(f"Error: Unable to load image {image_path}")
        return

    _, binary_image = cv2.threshold(image, 0, 255, cv2.THRESH_BINARY_INV + cv2.THRESH_OTSU)
    horizontal_projection = np.sum(binary_image, axis=1)

    smoothed, spacing_factor = auto_savgol_smooth(
        horizontal_projection,
        save_path=output_path,
        plot=True,
        show_thresholds=True
    )

    # === Dynamic Thresholds ===
    Q1 = np.percentile(smoothed, 25)
    Q3 = np.percentile(smoothed, 75)
    IQR = Q3 - Q1
    mean_val = np.mean(smoothed)
    min_val = np.min(smoothed)
    max_val = np.max(smoothed)

    iqr_low = Q1 + 0.2 * IQR
    iqr_high = iqr_low + 0.2 * IQR
    mean_low = mean_val * 0.25
    mean_high = mean_val * 0.5
    scaled_low = min_val + 0.1 * (max_val - min_val)
    scaled_high = min_val + 0.3 * (max_val - min_val)

    low_thresh = np.median([iqr_low, mean_low, scaled_low])
    high_thresh = np.median([iqr_high, mean_high, scaled_high])

    # Re-plot with thresholds
    smoothed, _ = auto_savgol_smooth(
        horizontal_projection,
        spacing_factor=spacing_factor,
        save_path=output_path,
        plot=True,
        show_thresholds=True,
        high_thresh=high_thresh,
        low_thresh=low_thresh
    )

    # === Line Detection with Relaxed High Threshold at Bottom ===
    line_ranges = []
    is_in_line = False
    relaxed_zone = int(0.8 * len(smoothed))

    for row, value in enumerate(smoothed):
        current_high = high_thresh
        if row > relaxed_zone:
            current_high = high_thresh * 0.65  # relax threshold in bottom zone

        if value > current_high and not is_in_line:
            start_row = row
            is_in_line = True
        elif value < low_thresh and is_in_line:
            end_row = row
            line_ranges.append((start_row, end_row))
            is_in_line = False

    if is_in_line:
        line_ranges.append((start_row, len(smoothed)))

    # === Fallback: Recover Missed Final Line ===
    last_line_margin = int(len(smoothed) * 0.17)
    end_threshold = len(smoothed) - last_line_margin
    last_part_vals = smoothed[-last_line_margin:]

    if all(end < end_threshold for _, end in line_ranges):
        if np.max(last_part_vals) > low_thresh:
            fallback_start = end_threshold
            line_ranges.append((fallback_start, len(smoothed)))

    # === Refine Borders ===
    if line_ranges:
        line_ranges[0] = (max(0, line_ranges[0][0] - 5), line_ranges[0][1])
        line_ranges[-1] = (line_ranges[-1][0], min(image.shape[0], line_ranges[-1][1] + 5))

    for i in range(1, len(line_ranges)):
        temp = (line_ranges[i - 1][1] + line_ranges[i][0]) // 2
        line_ranges[i - 1] = (line_ranges[i - 1][0], temp)
        line_ranges[i] = (temp, line_ranges[i][1])

    line_ranges = sorted(line_ranges, key=lambda x: x[0])

    # === Save Cropped Lines ===
    subfolder_output = os.path.join(base_output_printed, folder_name, image_name_no_ext)
    os.makedirs(subfolder_output, exist_ok=True)

    for idx, (start, end) in enumerate(line_ranges, 1):
        cropped_line = image[start:end, :]
        save_path = os.path.join(subfolder_output, f"{idx}.png")
        print(f'save_path = {save_path}')
        cv2.imwrite(save_path, cropped_line)

In [11]:
# === Batch processor ===
def process_all_images():
    for root, dirs, files in os.walk(base_input_dir):
        image_files = sorted(
            [f for f in files if f.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp', '.tiff'))]
        )
        for file in image_files:
            image_path = os.path.join(root, file)
            folder_name = os.path.basename(root)
            try:
                calculate_projection_profile_and_crop_lines_with_lines(image_path, folder_name)
            except Exception as e:
                print(f"Error processing {image_path}: {e}")

In [12]:
process_all_images()

save_path = kaggle/output/cropped_outputs_line\cropped_outputs\0dffb5d295124f7983a3ded7dbb8ab96\1.png
save_path = kaggle/output/cropped_outputs_line\cropped_outputs\0dffb5d295124f7983a3ded7dbb8ab96\2.png
save_path = kaggle/output/cropped_outputs_line\cropped_outputs\0dffb5d295124f7983a3ded7dbb8ab96\3.png


# Predicting whether Image is handwritten or printed

In [13]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from tqdm.notebook import tqdm
import torch.nn as nn

# Constants
IMG_SIZE = 128
BATCH_SIZE = 32
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Define the same model class
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.net = nn.Sequential(
            nn.Conv2d(3, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.MaxPool2d(2),
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Dropout(0.25),
            nn.MaxPool2d(2),
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.AdaptiveAvgPool2d((1, 1))
        )
        self.fc = nn.Linear(128, 2)

    def forward(self, x):
        x = self.net(x)
        x = x.view(x.size(0), -1)
        return self.fc(x)

# Create model and load weights
model = SimpleCNN().to(DEVICE)
model.load_state_dict(torch.load("kaggle/input/weights/final_model_weights_HvP.pth", map_location=DEVICE))
# model.eval()

# Test transforms
test_transform = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5])
])

In [14]:
from PIL import Image
import torch

# --- Load and preprocess the image ---
def predict_HvP(image_path):
    image = Image.open(image_path).convert("RGB")
    image_tensor = test_transform(image).unsqueeze(0).to(DEVICE)  # Add batch dimension
    
    # --- Make prediction ---
    # model.eval()
    with torch.no_grad():
        output = model(image_tensor)
        predicted_class = output.argmax(1).item()
    
    # --- Map class index to class name ---
    class_names = ["handwritten", "printed"]  # Get class names from dataset
    print(f"Predicted class label: {class_names[predicted_class]}")
    return predicted_class

# Predicting text from image using TrOCR

In [None]:
from PIL import Image
import torch
from transformers import TrOCRProcessor, VisionEncoderDecoderModel
import os

# Load models and processors only once
hand_written_model_id = "microsoft/trocr-large-handwritten"
printed_model_id = "microsoft/trocr-base-printed"

device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Running on {device}")

# Load both models and processors once
printed_processor = TrOCRProcessor.from_pretrained(printed_model_id)
printed_model = VisionEncoderDecoderModel.from_pretrained(printed_model_id).to(device)

handwritten_processor = TrOCRProcessor.from_pretrained(hand_written_model_id)
handwritten_model = VisionEncoderDecoderModel.from_pretrained(hand_written_model_id).to(device)

# OCR runner
def run_trOCR(model, processor, image_path):
    image = Image.open(image_path).convert("RGB")
    pixel_values = processor(image, return_tensors="pt").pixel_values.to(device)
    generated_ids = model.generate(pixel_values, max_new_tokens=1000)
    generated_text = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]
    # print(f"{os.path.basename(image_path)} -> {generated_text}")
    return generated_text

Running on cpu


Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
Some weights of VisionEncoderDecoderModel were not initialized from the model checkpoint at microsoft/trocr-base-printed and are newly initialized: ['encoder.pooler.dense.bias', 'encoder.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of VisionEncoderDecoderModel were not initialized from the model checkpoint at microsoft/trocr-large-handwritten and are newly initialized: ['encoder.pooler.dense.bias', 'encoder.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


# Final Prediction for the texts

In [16]:
from transformers import T5ForConditionalGeneration, T5Tokenizer
import torch

# Load the small grammar correction model (T5)
device = "cuda" if torch.cuda.is_available() else "cpu"
t5_tokenizer = T5Tokenizer.from_pretrained("vennify/t5-base-grammar-correction")
t5_model = T5ForConditionalGeneration.from_pretrained("vennify/t5-base-grammar-correction").to(device)

You are using the default legacy behaviour of the <class 'transformers.models.t5.tokenization_t5.T5Tokenizer'>. This is expected, and simply means that the `legacy` (previous) behavior will be used so nothing changes for you. If you want to use the new behaviour, set `legacy=False`. This should only be set if you understand what it means, and thoroughly read the reason why this was added as explained in https://github.com/huggingface/transformers/pull/24565


In [17]:
def restore_case_and_punctuation(text: str, max_length=128):
    input_text = "grammar: " + text.lower()
    input_ids = t5_tokenizer.encode(input_text, return_tensors="pt").to(device)
    outputs = t5_model.generate(input_ids, max_length=max_length, num_beams=4, early_stopping=True)
    corrected = t5_tokenizer.decode(outputs[0], skip_special_tokens=True)
    return corrected

In [18]:
# Path to input images
folder_dir = "kaggle/output/cropped_outputs_line/cropped_outputs/"
for paths in os.listdir(folder_dir):
    
    input_dir = os.path.join(folder_dir,paths)       
    # Collect image file paths
    file_paths = [
        os.path.join(input_dir, filename)
        for filename in (os.listdir(input_dir))
    ]

    # Loop through and run OCR
    for path in file_paths:
        # img = cv2.imread(path)
        # plt.imshow(img, cmap='gray')
        raw_text = ""
        if predict_HvP(path) == 1:
            raw_text = ((run_trOCR(printed_model, printed_processor, path)))
        else:
            raw_text = ((run_trOCR(handwritten_model, handwritten_processor, path)))
        
        fixed_text = restore_case_and_punctuation(raw_text)
        print("Fixed Text:", fixed_text)

Predicted class label: handwritten
Fixed Text: I love Kuet.
Predicted class label: handwritten
Fixed Text: It is a wonderful thing.


In [None]:
import pkg_resources
from symspellpy import SymSpell
from transformers import AutoTokenizer, AutoModelForTokenClassification, pipeline

# ---------------------------
# Setup Spell Correction (SymSpell)
# ---------------------------
sym_spell = SymSpell(max_dictionary_edit_distance=2, prefix_length=7)
dictionary_path = pkg_resources.resource_filename(
    "symspellpy", "frequency_dictionary_en_82_765.txt"
)
sym_spell.load_dictionary(dictionary_path, term_index=0, count_index=1)

# ---------------------------
# Setup Punctuation Restoration
# ---------------------------
tokenizer = AutoTokenizer.from_pretrained("oliverguhr/fullstop-punctuation-multilang-large")
model = AutoModelForTokenClassification.from_pretrained("oliverguhr/fullstop-punctuation-multilang-large")
punct_pipeline = pipeline("token-classification", model=model, tokenizer=tokenizer, aggregation_strategy="simple")

# ---------------------------
# Main Function
# ---------------------------
def restore_text(text: str) -> str:
    # Step 1: Spell Correction
    suggestions = sym_spell.lookup_compound(text, max_edit_distance=2)
    corrected_text = suggestions[0].term if suggestions else text
    
    # Step 2: Case Restoration (simple heuristic)
    corrected_text = corrected_text.strip()
    if corrected_text:  
        corrected_text = corrected_text[0].upper() + corrected_text[1:]
    corrected_text = corrected_text.replace(" i ", " I ")
    
    # Step 3: Punctuation Restoration
    tokens = corrected_text.split()
    predictions = punct_pipeline(corrected_text)
    
    restored_text = ""
    for i, token in enumerate(tokens):
        restored_text += token
        # Add punctuation if predicted
        if i < len(predictions) and predictions[i]['word'] in [".", ",", "?", "!"]:
            restored_text += predictions[i]['word']
        restored_text += " "
    
    return restored_text.strip()

# ---------------------------
# Example Usage
# --------------------------
print(restore_text("i havv a goood speling systm it work well"))


tokenizer_config.json:   0%|          | 0.00/406 [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development


config.json:   0%|          | 0.00/892 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


sentencepiece.bpe.model:   0%|          | 0.00/5.07M [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


tokenizer.json:   0%|          | 0.00/17.1M [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/239 [00:00<?, ?B/s]

Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


model.safetensors:   0%|          | 0.00/2.24G [00:00<?, ?B/s]

Device set to use cpu


I have a good spelling system it work we
