In [1]:
! pip install opencv-python

Collecting opencv-python
  Downloading opencv_python-4.11.0.86-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (20 kB)
Collecting numpy>=1.21.2 (from opencv-python)
  Downloading numpy-2.2.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (62 kB)
Downloading opencv_python-4.11.0.86-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (63.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.0/63.0 MB[0m [31m638.8 kB/s[0m eta [36m0:00:00[0m00:01[0m00:03[0m
[?25hDownloading numpy-2.2.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (16.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.1/16.1 MB[0m [31m660.7 kB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: numpy, opencv-python
Successfully installed numpy-2.2.4 opencv-python-4.11.0.86


In [2]:
import cv2
import numpy as np
import pytesseract
import re
import argparse
import logging
import sys
from typing import List, Dict, Any

In [3]:
# Configure logging
logging.basicConfig(format='%(levelname)s: %(message)s', level=logging.INFO)

In [4]:
# -----------------------------------------------------------------------------
# 1. Image Pre-processing Functions
# -----------------------------------------------------------------------------
def load_image(image_path: str) -> np.ndarray:
    """Loads the image from the provided path."""
    image = cv2.imread(image_path)
    if image is None:
        raise FileNotFoundError(f"Image not found: {image_path}")
    logging.info("Image loaded successfully.")
    return image

def preprocess_image(image_path: str) -> np.ndarray:
    """
    Reads an image from a file, converts it to grayscale, applies adaptive thresholding,
    performs noise reduction, and deskews the image.
    """
    img = load_image(image_path)
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
    
    # Apply GaussianBlur for noise reduction
    blurred = cv2.GaussianBlur(gray, (5, 5), 0)
    
    # Adaptive thresholding to emphasize text
    proc = cv2.adaptiveThreshold(blurred, 255, cv2.ADAPTIVE_THRESH_GAUSSIAN_C,
                                 cv2.THRESH_BINARY, 11, 2)
    
    # Deskew the image based on non-zero pixel coordinates
    coords = np.column_stack(np.where(proc > 0))
    if coords.size == 0:
        logging.warning("No text detected for deskewing; proceeding without deskew.")
        return proc
    
    angle = cv2.minAreaRect(coords)[-1]
    # Adjust angle based on observation from cv2.minAreaRect
    if angle < -45:
        angle = -(90 + angle)
    else:
        angle = -angle

    logging.info(f"Deskew angle determined: {angle:.2f} degrees")
    (h, w) = proc.shape[:2]
    center = (w // 2, h // 2)
    M = cv2.getRotationMatrix2D(center, angle, 1.0)
    proc = cv2.warpAffine(proc, M, (w, h), flags=cv2.INTER_CUBIC, borderMode=cv2.BORDER_REPLICATE)
    
    return proc


In [5]:
# -----------------------------------------------------------------------------
# 2. OCR Extraction
# -----------------------------------------------------------------------------
def perform_ocr(image: np.ndarray) -> str:
    """
    Uses pytesseract to extract text from the processed image.
    The OCR configuration can be tuned; here we use:
        --oem 3: Default (LSTM only in recent versions)
        --psm 6: Assume a uniform block of text.
    """
    custom_config = r'--oem 3 --psm 6'
    try:
        text = pytesseract.image_to_string(image, config=custom_config)
        logging.info("OCR extraction completed.")
        return text
    except Exception as e:
        logging.error("An error occurred during OCR extraction: " + str(e))
        return ""

In [6]:
# -----------------------------------------------------------------------------
# 3. Layout Analysis via Heuristics
# -----------------------------------------------------------------------------
def is_table_line(line: str) -> bool:
    """
    Simple heuristic:
      If a line contains multiple consecutive spaces or tabs, it may be part of a table.
    """
    return bool(re.search(r'(\t|\s\s+)', line))

def group_lines_into_blocks(text: str) -> List[Dict[str, Any]]:
    """
    Splits OCR text into lines and groups consecutive lines into blocks.
    Each block is tagged as "table" or "paragraph" based on the heuristic.
    Empty lines indicate block boundaries.
    
    Returns:
        A list of dictionaries, each with keys "type" and "lines".
    """
    lines = text.splitlines()
    blocks: List[Dict[str, Any]] = []
    current_block: Dict[str, Any] = {"type": None, "lines": []}

    for line in lines:
        stripped = line.strip()
        if not stripped:
            if current_block["lines"]:
                blocks.append(current_block)
                current_block = {"type": None, "lines": []}
            continue

        line_type = "table" if is_table_line(line) else "paragraph"
        if current_block["type"] is None:
            # Start a new block.
            current_block["type"] = line_type
            current_block["lines"].append(line)
        elif current_block["type"] == line_type:
            current_block["lines"].append(line)
        else:
            blocks.append(current_block)
            current_block = {"type": line_type, "lines": [line]}

    if current_block["lines"]:
        blocks.append(current_block)
    logging.info(f"Grouped text into {len(blocks)} blocks.")
    return blocks


In [7]:
# -----------------------------------------------------------------------------
# 4. Table Parsing and Markdown Conversion
# -----------------------------------------------------------------------------
def parse_table_block(block: Dict[str, Any]) -> List[List[str]]:
    """
    Process a block that is presumed to be table data.
    Splits each line using tab characters or multiple consecutive spaces.
    Returns a 2D list representing rows and columns.
    """
    table_data = []
    for line in block["lines"]:
        # Prefer tabs; if absent, use multiple spaces
        if '\t' in line:
            row = [cell.strip() for cell in line.split('\t') if cell.strip()]
        else:
            row = [cell.strip() for cell in re.split(r'\s{2,}', line) if cell.strip()]
        if row:
            table_data.append(row)
    
    # Normalize the table rows so that every row has the same number of columns.
    if table_data:
        max_cols = max(len(row) for row in table_data)
    else:
        max_cols = 0

    normalized_table = []
    for row in table_data:
        if len(row) < max_cols:
            row.extend([""] * (max_cols - len(row)))
        normalized_table.append(row)
    return normalized_table

In [8]:
def convert_table_to_markdown(table: List[List[str]]) -> str:
    """
    Converts a 2D list (of rows and cells) into a Markdown formatted table.
    The first row is treated as the table header.
    """
    if not table:
        return ""
    
    md_lines = []
    header = table[0]
    md_lines.append(" | ".join(header))
    md_lines.append(" | ".join(["---"] * len(header)))
    
    for row in table[1:]:
        md_lines.append(" | ".join(row))
    
    return "\n".join(md_lines)

In [9]:
def convert_blocks_to_markdown(blocks: List[Dict[str, Any]]) -> str:
    """
    Converts each block into Markdown format:
      • Paragraph blocks are joined as-is.
      • Table blocks are passed through table parsing and converted into Markdown table syntax.
    Returns the entire Markdown document as a string.
    """
    markdown_parts = []

    for block in blocks:
        if block["type"] == "paragraph":
            markdown_parts.append("\n".join(block["lines"]))
        elif block["type"] == "table":
            table = parse_table_block(block)
            md_table = convert_table_to_markdown(table)
            markdown_parts.append(md_table)
        # Append an empty line after each block for readability.
        markdown_parts.append("")
    
    return "\n".join(markdown_parts)

In [10]:
# -----------------------------------------------------------------------------
# 5. Pipeline Assembly and Execution
# -----------------------------------------------------------------------------
def run_pipeline(image_path: str, output_file: str) -> None:
    logging.info("Beginning processing of image: %s", image_path)
    
    processed_img = preprocess_image(image_path)
    ocr_text = perform_ocr(processed_img)
    
    if not ocr_text.strip():
        logging.error("OCR produced no text. Exiting.")
        sys.exit(1)
    
    blocks = group_lines_into_blocks(ocr_text)
    markdown_text = convert_blocks_to_markdown(blocks)
    
    try:
        with open(output_file, "w", encoding="utf-8") as f:
            f.write(markdown_text)
        logging.info("Markdown output successfully saved to: %s", output_file)
    except Exception as e:
        logging.error("Error writing Markdown output: " + str(e))
        sys.exit(1)

In [11]:
# -----------------------------------------------------------------------------
# 6. Command-Line Interface
# -----------------------------------------------------------------------------
def parse_args():
    parser = argparse.ArgumentParser(
        description="Convert scanned financial document images into Markdown."
    )
    parser.add_argument("image", help="Path to the image file to be processed.")
    parser.add_argument(
        "-o", "--output", default="output.md", help="Path for the output Markdown file."
    )
    return parser.parse_args()

def main():
    args = parse_args()
    run_pipeline(args.image, args.output)

In [13]:
if __name__ == "__main__":
    main()

usage: ipykernel_launcher.py [-h] [-o OUTPUT] image
ipykernel_launcher.py: error: the following arguments are required: image


SystemExit: 2