# Load Data

This notebook outlines the end-to-end preprocessing workflow for preparing the OCR dataset for training. The goal is to ensure the data is clean, balanced, and optimized for model development.

In [2]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
%matplotlib inline

import os
import math
import cv2
from typing import Optional, Literal, Tuple, List

In [3]:
# ----- Load Full Dataset -----
data = pd.read_parquet(r"data\raw_data\train_raw.parquet")

# ----- Split By Source -----
printed = data[data["source"] == "printed"]
written = data[data["source"] == "handwritten"]

# ----- Pre-sampling size -----
print("Printed Size :", len(printed))
print("Handwritten Size :", len(written))

Printed Size : 1733904
Handwritten Size : 6482


## Data Preprocessing


Based on exploratory data analysis performed on this dataset, we will perform the fillowing preprocessing techniques:
- Downsampling (printed data)
- Conversion to Grayscale
- Resizing to Fixed Dimensions
- Noise Removal (conditional)
- Binarization (conditional)
- Normalization
- Padding and Alignment
- Augmentation (handwritten data)

### Down Sampling

We will down-sample printed and data to 100_000 samples, and augment handwritten data to increase size to 100_000. This will fix the issue of class imbalance while prserving enough data for training.

In [8]:
# ----- Downsample Printed Data -----
printed = printed.sample(n = 100000, random_state=42)

# ----- Post-sampling size -----
print("Handwritten Size post-sampling:", len(printed))

Handwritten Size post-sampling: 100000


### Preprocessing Functions

#### Extract Image from Byte Format
This step ensures that images stored in raw byte format can be safely processed.  


In [10]:
# ----- Obtain Raw Bytes -----
def get_bytes(x) -> bytes:
    if isinstance(x, dict) and "bytes" in x:
        v = x["bytes"]
        if isinstance(v, (bytes, bytearray, memoryview)):
            return bytes(v)
    if isinstance(x, (bytes, bytearray, memoryview)):
        return bytes(x)
    raise TypeError("Image field must be bytes or dict containing key 'bytes' with bytes")

# ----- Decode Compressed Image Bytes -----
def decode_image_cv2(image_bytes: bytes) -> np.ndarray:
    buf = np.frombuffer(image_bytes, dtype=np.uint8)
    img = cv2.imdecode(buf, cv2.IMREAD_UNCHANGED)
    if img is None:
        raise ValueError("Failed to decode image bytes")
    return img

#### Grayscale Conversion  
This step converts input images into grayscale format to simplify processing and reduce computational cost.  
- If the image is already 2D (grayscale), it is returned as-is.  
- If the image has 3 channels (BGR) or 4 channels (BGRA), OpenCV is used to convert it to grayscale.  
- A fallback conversion ensures robustness in case the input has unexpected channel arrangements.  

Grayscale images are essential for tasks like blur detection, thresholding, and OCR preprocessing since color information is often unnecessary.

In [11]:
# ----- Convert to Grayscale -----
def to_grayscale(img: np.ndarray) -> np.ndarray:
    if img.ndim == 2:
        return img
    if img.ndim == 3:
        if img.shape[2] == 3:
            return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        if img.shape[2] == 4:
            return cv2.cvtColor(img, cv2.COLOR_BGRA2GRAY)
        
    # ----- Fallback -----
    return cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

#### Resizing to a Fixed Height  

This step ensures all images have a consistent height (`target_h`, default = 128) while preserving their aspect ratio.  
- If the image already matches the target height, it is returned unchanged.  
- Otherwise, the width is scaled proportionally to maintain aspect ratio.  
- **Downscaling** uses `INTER_AREA` (better for shrinking), while **upscaling** uses `INTER_LINEAR` (smoother enlargement).  

This standardization is important for batching images and feeding them into models that require uniform input dimensions.

In [12]:
# ----- Resize to Consistent Height -----
def resize_to_fixed_height(gray: np.ndarray, target_h: int = 128) -> np.ndarray:
    h, w = gray.shape[:2]
    if h == target_h:
        return gray
        
    scale = target_h / float(h)
    new_w = max(1, int(round(w * scale)))
    method = cv2.INTER_AREA if target_h < h else cv2.INTER_LINEAR
    
    return cv2.resize(gray, (new_w, target_h), interpolation=method)

#### Noise Removal (Optional)  

This step reduces unwanted noise in grayscale images to improve clarity for OCR and preprocessing tasks.  
- **No Denoising** → returns the image unchanged.  
- **Median Filter** → effective for removing salt-and-pepper noise while preserving edges.  
- **Bilateral Filter** → smooths the image while keeping edges sharp, useful for text-heavy images.  

The method can be chosen via the `method` parameter (`"median"` or `"bilateral"`). If none is specified, the original image is retained.


In [13]:
# ----- Noise Removal -----
def denoise_optional(gray: np.ndarray, method: Optional[Literal["median", "bilateral"]] = None) -> np.ndarray:
    if method is None:
        return gray
    if method == "median":
        return cv2.medianBlur(gray, ksize=3)
    if method == "bilateral":
        return cv2.bilateralFilter(gray, d=5, sigmaColor=20, sigmaSpace=10)
        
    raise ValueError("Unsupported denoise method")

#### Binarization (Optional)  

This step converts grayscale images into black-and-white (binary) format, making text more distinct and improving OCR performance.  
- **No Binarization** → image remains in grayscale.  
- **Adaptive Mean Thresholding** → threshold value is calculated as the mean of neighboring pixels within a block.  
- **Adaptive Gaussian Thresholding** → threshold value is computed using a Gaussian-weighted sum of neighboring pixels.  

Both adaptive methods help handle uneven lighting conditions and preserve readability of handwritten/printed text.

In [14]:
# ----- Binarization -----
def binarize_optional(gray: np.ndarray, method: Optional[Literal["adaptive_mean", "adaptive_gaussian"]] = None) -> np.ndarray:
    if method is None:
        return gray
    block_size = 25
    C = 10
    if method == "adaptive_mean":
        return cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_MEAN_C, cv2.THRESH_BINARY, block_size, C)
    if method == "adaptive_gaussian":
        return cv2.adaptiveThreshold(gray, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C, cv2.THRESH_BINARY, block_size, C)
    
    raise ValueError("Unsupported binarization method")

#### Normalization  

This step scales pixel intensity values from **[0, 255]** to a floating-point range of **[0.0, 1.0]**.  
Normalization ensures consistency across images, improves numerical stability, and helps models train more efficiently.  

In [15]:
# ----- Normalization -----
def normalize_01(img_u8: np.ndarray) -> np.ndarray:
    return (img_u8.astype(np.float32) / 255.0)

#### Padding  

This step standardizes the image width to a fixed target while preserving height.  
- If the image is wider than the target, it is **cropped**.  
- If it is narrower, it is **padded** with a constant value (default: white background = `1.0`).  
- Padding can be applied **left-aligned** or **center-aligned**, ensuring consistent dimensions for model training.  

In [16]:
# ----- Padding -----
def pad_to_width(img: np.ndarray, target_w: int, pad_value: float = 1.0, align: Literal["left", "center"] = "left") -> np.ndarray:
    h, w = img.shape
    
    if w == target_w:
        return img
    if w > target_w:
        return img[:, :target_w]

    out = np.full((h, target_w), fill_value=pad_value, dtype=img.dtype)
    
    if align == "left":
        out[:, :w] = img
    elif align == "center":
        offset = (target_w - w) // 2
        out[:, offset:offset + w] = img
    else:
        raise ValueError("Unsupported align")
        
    return out

### Preprocessing Pipeline

In [18]:
# ----- Preprocessing Pipeline For Single Image -----
def preprocess_image_bytes(image_bytes: bytes,
                           target_h: int = 128,
                           denoise: Optional[Literal["median", "bilateral"]] = None,
                           binarize: Optional[Literal["adaptive_mean", "adaptive_gaussian"]] = None,
                           pad_width: Optional[int] = None,
                           pad_align: Literal["left", "center"] = "left",
                           pad_value: float = 1.0) -> np.ndarray:

    img = decode_image_cv2(image_bytes)
    gray = to_grayscale(img)
    gray = resize_to_fixed_height(gray, target_h=target_h)
    gray = denoise_optional(gray, method=denoise)
    gray = binarize_optional(gray, method=binarize)
    arr = normalize_01(gray)  # float32 [0,1]

    if pad_width is not None:
        arr = pad_to_width(arr, target_w=pad_width, pad_value=pad_value, align=pad_align)
        
    return arr

In [19]:
# ----- Preprocessing Pipeline For Complete Dataframe -----
def preprocess_dataframe(df: pd.DataFrame,
                         image_col: str = "image",
                         label_col: str = "text",
                         source_col: str = "source",
                         target_h: int = 128,
                         denoise: Optional[str] = None,
                         binarize: Optional[str] = None,
                         pad_width: Optional[int] = None,
                         pad_align: str = "left",
                         pad_value: float = 1.0) -> Tuple[List[np.ndarray], List[str], List[str]]:

    images, labels, sources = [], [], []
    
    for _, row in df.iterrows():
        b = get_bytes(row[image_col])
        arr = preprocess_image_bytes(
            b,
            target_h=target_h,
            denoise=denoise,
            binarize=binarize,
            pad_width=pad_width,
            pad_align=pad_align,
            pad_value=pad_value,
        )
        
        images.append(arr)
        labels.append(row[label_col])
        sources.append(row[source_col])
        
    return images, labels, sources

## Preprocessing Implementation

In [21]:
# ----- Define Defaults for Current Dataset -----
TARGET_H = 128
DENOISE = None       
BINARIZE = None         
PAD_WIDTH = None        
PAD_ALIGN = "left"
PAD_VALUE = 1.0

# ----- Preprocess Printed Dataset -----
printed_imgs, printed_labels, printed_sources = preprocess_dataframe(
    printed,
    image_col="image",
    label_col="text",
    source_col="source",
    target_h=TARGET_H,
    denoise=DENOISE,
    binarize=BINARIZE,
    pad_width=PAD_WIDTH,
    pad_align=PAD_ALIGN,
    pad_value=PAD_VALUE,
)

# ----- Preprocess Handwritten Dataset -----
hand_imgs, hand_labels, hand_sources = preprocess_dataframe(
    written,
    image_col="image",
    label_col="text",
    source_col="source",
    target_h=TARGET_H,
    denoise=DENOISE,
    binarize=BINARIZE,
    pad_width=PAD_WIDTH,
    pad_align=PAD_ALIGN,
    pad_value=PAD_VALUE,
)