## Using the typhoon-ocr package

In [None]:
!pip install typhoon-ocr

In [None]:
# Skipping Linux-specific poppler-utils install; not needed for Windows and local image use.
# !apt-get update && apt-get install -y poppler-utils

In [None]:
import os
os.environ['TYPHOON_OCR_API_KEY'] = "sk-4IXy2viwPZnoAYVuYELYsFRkqNERSz5XFUNkmgQ3G1B4Miw6" #You can get an API key from https://opentyphoon.ai/

In [None]:
from IPython.display import Image, Markdown
image_path = r'C:/Users/iHC/Desktop/OCRaft-main/BOMs.png'
Image(image_path)

In [None]:
from typhoon_ocr import ocr_document

#You can experiment with two task_type options 'default' or 'structure' to see which one best suits your use case.

markdown = ocr_document(image_path, task_type = "default")
print(markdown)

In [None]:
markdown1 = ocr_document(image_path, task_type = "structure")
print(markdown1)

## Manually using either the API or a local model. (Full python code)

In [None]:
!pip install ftfy
!pip install pypdf
!pip install pillow
!pip install openai
!pip install accelerate
!pip install transformers

In [None]:
# Skipping Linux-specific poppler-utils install; not needed for Windows and local image use.
# !apt-get install -y poppler-utils

In [None]:
"""
This code is copied from https://github.com/allenai/olmocr
Under the Apache 2.0 license.
All credit goes to the original authors.
"""
from dataclasses import dataclass
import re
import tempfile
from PIL import Image
import subprocess
import base64
from typing import List, Literal
import random
import ftfy
from pypdf.generic import RectangleObject
from pypdf import PdfReader

@dataclass(frozen=True)
class Element:
    pass


@dataclass(frozen=True)
class BoundingBox:
    x0: float
    y0: float
    x1: float
    y1: float

    @staticmethod
    def from_rectangle(rect: RectangleObject) -> "BoundingBox":
        return BoundingBox(rect[0], rect[1], rect[2], rect[3])


@dataclass(frozen=True)
class TextElement(Element):
    text: str
    x: float
    y: float


@dataclass(frozen=True)
class ImageElement(Element):
    name: str
    bbox: BoundingBox


@dataclass(frozen=True)
class PageReport:
    mediabox: BoundingBox
    text_elements: List[TextElement]
    image_elements: List[ImageElement]

def image_to_pdf(image_path):
    try:
        # Open the image file.
        img = Image.open(image_path)
        # Create a temporary file to store the PDF.
        with tempfile.NamedTemporaryFile(delete=False, suffix=".pdf") as tmp:
            filename = tmp.name
            temp_pdf_created = True
        # Convert image to RGB if necessary and save as PDF.
        if img.mode != "RGB":
            img = img.convert("RGB")
        img.save(filename, "PDF")
        return filename
    except Exception as conv_err:
        return None

def get_pdf_media_box_width_height(local_pdf_path: str, page_num: int) -> tuple[float, float]:
    """
    Get the MediaBox dimensions for a specific page in a PDF file using the pdfinfo command.

    :param pdf_file: Path to the PDF file
    :param page_num: The page number for which to extract MediaBox dimensions
    :return: A dictionary containing MediaBox dimensions or None if not found
    """
    # Construct the pdfinfo command to extract info for the specific page
    command = ["pdfinfo", "-f", str(page_num), "-l", str(page_num), "-box", "-enc", "UTF-8", local_pdf_path]

    # Run the command using subprocess
    result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

    # Check if there is any error in executing the command
    if result.returncode != 0:
        raise ValueError(f"Error running pdfinfo: {result.stderr}")

    # Parse the output to find MediaBox
    output = result.stdout

    for line in output.splitlines():
        if "MediaBox" in line:
            media_box_str: List[str] = line.split(":")[1].strip().split()
            media_box: List[float] = [float(x) for x in media_box_str]
            return abs(media_box[0] - media_box[2]), abs(media_box[3] - media_box[1])

    raise ValueError("MediaBox not found in the PDF info.")

def render_pdf_to_base64png(local_pdf_path: str, page_num: int, target_longest_image_dim: int = 2048) -> str:
    longest_dim = max(get_pdf_media_box_width_height(local_pdf_path, page_num))

    # Convert PDF page to PNG using pdftoppm
    pdftoppm_result = subprocess.run(
        [
            "pdftoppm",
            "-png",
            "-f",
            str(page_num),
            "-l",
            str(page_num),
            "-r",
            str(target_longest_image_dim * 72 / longest_dim),  # 72 pixels per point is the conversion factor
            local_pdf_path,
        ],
        timeout=120,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
    )
    assert pdftoppm_result.returncode == 0, pdftoppm_result.stderr
    return base64.b64encode(pdftoppm_result.stdout).decode("utf-8")


def _linearize_pdf_report(report: PageReport, max_length: int = 4000) -> str:
    result = ""
    result += f"Page dimensions: {report.mediabox.x1:.1f}x{report.mediabox.y1:.1f}\n"

    if max_length < 20:
        return result

    images = _merge_image_elements(report.image_elements)

    # Process image elements
    image_strings = []
    for element in images:
        image_str = f"[Image {element.bbox.x0:.0f}x{element.bbox.y0:.0f} to {element.bbox.x1:.0f}x{element.bbox.y1:.0f}]\n"
        # Use element's unique identifier (e.g., id or position) for comparison
        image_strings.append((element, image_str))

    # Process text elements
    text_strings = []
    for element in report.text_elements:  # type: ignore
        if len(element.text.strip()) == 0:  # type: ignore
            continue

        element_text = _cleanup_element_text(element.text)  # type: ignore
        text_str = f"[{element.x:.0f}x{element.y:.0f}]{element_text}\n"  # type: ignore
        text_strings.append((element, text_str))

    # Combine all elements with their positions for sorting
    all_elements: list[tuple[str, ImageElement, str, tuple[float, float]]] = []
    for elem, s in image_strings:
        position = (elem.bbox.x0, elem.bbox.y0)
        all_elements.append(("image", elem, s, position))
    for elem, s in text_strings:
        position = (elem.x, elem.y)  # type: ignore
        all_elements.append(("text", elem, s, position))

    # Calculate total length
    total_length = len(result) + sum(len(s) for _, _, s, _ in all_elements)

    if total_length <= max_length:
        # Include all elements
        for _, _, s, _ in all_elements:
            result += s
        return result

    # Identify elements with min/max coordinates
    edge_elements = set()

    if images:
        min_x0_image = min(images, key=lambda e: e.bbox.x0)
        max_x1_image = max(images, key=lambda e: e.bbox.x1)
        min_y0_image = min(images, key=lambda e: e.bbox.y0)
        max_y1_image = max(images, key=lambda e: e.bbox.y1)
        edge_elements.update([min_x0_image, max_x1_image, min_y0_image, max_y1_image])

    if report.text_elements:
        text_elements = [e for e in report.text_elements if len(e.text.strip()) > 0]
        if text_elements:
            min_x_text = min(text_elements, key=lambda e: e.x)
            max_x_text = max(text_elements, key=lambda e: e.x)
            min_y_text = min(text_elements, key=lambda e: e.y)
            max_y_text = max(text_elements, key=lambda e: e.y)
            edge_elements.update([min_x_text, max_x_text, min_y_text, max_y_text])  # type: ignore

    # Keep track of element IDs to prevent duplication
    selected_element_ids = set()
    selected_elements = []

    # Include edge elements first
    for elem_type, elem, s, position in all_elements:
        if elem in edge_elements and id(elem) not in selected_element_ids:
            selected_elements.append((elem_type, elem, s, position))
            selected_element_ids.add(id(elem))

    # Calculate remaining length
    current_length = len(result) + sum(len(s) for _, _, s, _ in selected_elements)
    _remaining_length = max_length - current_length

    # Exclude edge elements from the pool
    remaining_elements = [(elem_type, elem, s, position) for elem_type, elem, s, position in all_elements if id(elem) not in selected_element_ids]

    # Sort remaining elements by their positions (e.g., x-coordinate and then y-coordinate)
    # remaining_elements.sort(key=lambda x: (x[3][0], x[3][1]))

    # Shuffle remaining elements randomly
    random.shuffle(remaining_elements)

    # Add elements until reaching max_length
    for elem_type, elem, s, position in remaining_elements:
        if current_length + len(s) > max_length:
            break
        selected_elements.append((elem_type, elem, s, position))
        selected_element_ids.add(id(elem))
        current_length += len(s)

    # Sort selected elements by their positions to maintain logical order
    selected_elements.sort(key=lambda x: (x[3][0], x[3][1]))

    # Build the final result
    for _, _, s, _ in selected_elements:
        result += s

    return result


def _cap_split_string(text: str, max_length: int) -> str:
    if len(text) <= max_length:
        return text

    head_length = max_length // 2 - 3
    tail_length = head_length

    head = text[:head_length].rsplit(" ", 1)[0] or text[:head_length]
    tail = text[-tail_length:].split(" ", 1)[-1] or text[-tail_length:]

    return f"{head} ... {tail}"


def _cleanup_element_text(element_text: str) -> str:
    MAX_TEXT_ELEMENT_LENGTH = 250
    TEXT_REPLACEMENTS = {"[": "\\[", "]": "\\]", "\n": "\\n", "\r": "\\r", "\t": "\\t"}
    text_replacement_pattern = re.compile("|".join(re.escape(key) for key in TEXT_REPLACEMENTS.keys()))

    element_text = ftfy.fix_text(element_text).strip()

    # Replace square brackets with escaped brackets and other escaped chars
    element_text = text_replacement_pattern.sub(lambda match: TEXT_REPLACEMENTS[match.group(0)], element_text)

    return _cap_split_string(element_text, MAX_TEXT_ELEMENT_LENGTH)

def _merge_image_elements(images: List[ImageElement], tolerance: float = 0.5) -> List[ImageElement]:
    n = len(images)
    parent = list(range(n))  # Initialize Union-Find parent pointers

    def find(i):
        # Find with path compression
        root = i
        while parent[root] != root:
            root = parent[root]
        while parent[i] != i:
            parent_i = parent[i]
            parent[i] = root
            i = parent_i
        return root

    def union(i, j):
        # Union by attaching root of one tree to another
        root_i = find(i)
        root_j = find(j)
        if root_i != root_j:
            parent[root_i] = root_j

    def bboxes_overlap(b1: BoundingBox, b2: BoundingBox, tolerance: float) -> bool:
        # Compute horizontal and vertical distances between boxes
        h_dist = max(0, max(b1.x0, b2.x0) - min(b1.x1, b2.x1))
        v_dist = max(0, max(b1.y0, b2.y0) - min(b1.y1, b2.y1))
        # Check if distances are within tolerance
        return h_dist <= tolerance and v_dist <= tolerance

    # Union overlapping images
    for i in range(n):
        for j in range(i + 1, n):
            if bboxes_overlap(images[i].bbox, images[j].bbox, tolerance):
                union(i, j)

    # Group images by their root parent
    groups: dict[int, list[int]] = {}
    for i in range(n):
        root = find(i)
        groups.setdefault(root, []).append(i)

    # Merge images in the same group
    merged_images = []
    for indices in groups.values():
        # Initialize merged bounding box
        merged_bbox = images[indices[0]].bbox
        merged_name = images[indices[0]].name

        for idx in indices[1:]:
            bbox = images[idx].bbox
            # Expand merged_bbox to include the current bbox
            merged_bbox = BoundingBox(
                x0=min(merged_bbox.x0, bbox.x0),
                y0=min(merged_bbox.y0, bbox.y0),
                x1=max(merged_bbox.x1, bbox.x1),
                y1=max(merged_bbox.y1, bbox.y1),
            )
            # Optionally, update the name
            merged_name += f"+{images[idx].name}"

        merged_images.append(ImageElement(name=merged_name, bbox=merged_bbox))

    # Return the merged images along with other elements
    return merged_images

def _transform_point(x, y, m):
    x_new = m[0] * x + m[2] * y + m[4]
    y_new = m[1] * x + m[3] * y + m[5]
    return x_new, y_new

def _mult(m: List[float], n: List[float]) -> List[float]:
    return [
        m[0] * n[0] + m[1] * n[2],
        m[0] * n[1] + m[1] * n[3],
        m[2] * n[0] + m[3] * n[2],
        m[2] * n[1] + m[3] * n[3],
        m[4] * n[0] + m[5] * n[2] + n[4],
        m[4] * n[1] + m[5] * n[3] + n[5],
    ]

def _pdf_report(local_pdf_path: str, page_num: int) -> PageReport:
    reader = PdfReader(local_pdf_path)
    page = reader.pages[page_num - 1]
    resources = page.get("/Resources", {})
    xobjects = resources.get("/XObject", {})
    text_elements, image_elements = [], []

    def visitor_body(text, cm, tm, font_dict, font_size):
        txt2user = _mult(tm, cm)
        text_elements.append(TextElement(text, txt2user[4], txt2user[5]))

    def visitor_op(op, args, cm, tm):
        if op == b"Do":
            xobject_name = args[0]
            xobject = xobjects.get(xobject_name)
            if xobject and xobject["/Subtype"] == "/Image":
                # Compute image bbox
                # The image is placed according to the CTM
                _width = xobject.get("/Width")
                _height = xobject.get("/Height")
                x0, y0 = _transform_point(0, 0, cm)
                x1, y1 = _transform_point(1, 1, cm)
                image_elements.append(ImageElement(xobject_name, BoundingBox(min(x0, x1), min(y0, y1), max(x0, x1), max(y0, y1))))

    page.extract_text(visitor_text=visitor_body, visitor_operand_before=visitor_op)

    return PageReport(
        mediabox=BoundingBox.from_rectangle(page.mediabox),
        text_elements=text_elements,
        image_elements=image_elements,
    )

def get_anchor_text(
    local_pdf_path: str, page: int, pdf_engine: Literal["pdftotext", "pdfium", "pypdf", "topcoherency", "pdfreport"], target_length: int = 4000
) -> str:
    assert page > 0, "Pages are 1-indexed in pdf-land"


    if pdf_engine == "pdfreport":
        return _linearize_pdf_report(_pdf_report(local_pdf_path, page), max_length=target_length)
    else:
        raise NotImplementedError("Unknown engine")

In [None]:
from typing import Callable

PROMPTS_SYS = {
    "default": lambda base_text: (f"Below is an image of a document page along with its dimensions. "
        f"Simply return the markdown representation of this document, presenting tables in markdown format as they naturally appear.\n"
        f"If the document contains images, use a placeholder like dummy.png for each image.\n"
        f"Your final output must be in JSON format with a single key `natural_text` containing the response.\n"
        f"RAW_TEXT_START\n{base_text}\nRAW_TEXT_END"),
    "structure": lambda base_text: (
        f"Below is an image of a document page, along with its dimensions and possibly some raw textual content previously extracted from it. "
        f"Note that the text extraction may be incomplete or partially missing. Carefully consider both the layout and any available text to reconstruct the document accurately.\n"
        f"Your task is to return the markdown representation of this document, presenting tables in HTML format as they naturally appear.\n"
        f"If the document contains images or figures, analyze them and include the tag <figure>IMAGE_ANALYSIS</figure> in the appropriate location.\n"
        f"Your final output must be in JSON format with a single key `natural_text` containing the response.\n"
        f"RAW_TEXT_START\n{base_text}\nRAW_TEXT_END"
    ),
}

def get_prompt(prompt_name: str) -> Callable[[str], str]:
    """
    Fetches the system prompt based on the provided PROMPT_NAME.

    :param prompt_name: The identifier for the desired prompt.
    :return: The system prompt as a string.
    """
    return PROMPTS_SYS.get(prompt_name, lambda x: "Invalid PROMPT_NAME provided.")

In [None]:
import base64
from io import BytesIO
import json
import os
from transformers import AutoProcessor,Qwen2_5_VLForConditionalGeneration
import torch
from openai import OpenAI

def process_pdf(pdf_or_image_file, page_num ,task_type, engine_call):
    if pdf_or_image_file is None:
        return None, "No file uploaded"

    filename = pdf_or_image_file  # default to original file if PDF

    # If the file is not a PDF, assume it's an image and convert it to PDF.
    if not pdf_or_image_file.endswith(".pdf"):
        filename = image_to_pdf(pdf_or_image_file)
        print(filename)
        if filename is None:
            return None, "Error converting image to PDF"

    # Render the first page to base64 PNG and then load it into a PIL image.
    image_base64 = render_pdf_to_base64png(filename, page_num, target_longest_image_dim=1800)
    image_pil = Image.open(BytesIO(base64.b64decode(image_base64)))

    # Extract anchor text from the PDF (first page)
    anchor_text = get_anchor_text(filename, page_num, pdf_engine="pdfreport", target_length=8000)

    # Retrieve and fill in the prompt template with the anchor_text
    prompt_template_fn = get_prompt(task_type)
    PROMPT = prompt_template_fn(anchor_text)



    # Create a messages structure including text and image URL
    messages = [{
        "role": "user",
        "content": [
            {"type": "text", "text": PROMPT},
            {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{image_base64}"}},
        ],
    }]
    # send messages to openai compatible api
    if engine_call == "api":

      openai = OpenAI(base_url="TYPHOON_BASE_URL", api_key="TYPHOON_API_KEY") #You can get an API key from https://opentyphoon.ai/

      response = openai.chat.completions.create(
          model="typhoon-ocr-preview",
          messages=messages,
          max_tokens=16384,
          extra_body={
              "repetition_penalty": 1.2,
              "temperature": 0.1,
              "top_p": 0.6,
          },

      )
      text_output = response.choices[0].message.content

    else:

      # Initialize the model
      model = Qwen2_5_VLForConditionalGeneration.from_pretrained("scb10x/typhoon-ocr-7b", torch_dtype=torch.bfloat16 ).eval()
      processor = AutoProcessor.from_pretrained("scb10x/typhoon-ocr-7b")

      device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
      model.to(device)
      # Apply the chat template and processor
      text = processor.apply_chat_template(messages, tokenize=False, add_generation_prompt=True)
      main_image = Image.open(BytesIO(base64.b64decode(image_base64)))

      inputs = processor(
          text=[text],
          images=[main_image],
          padding=True,
          return_tensors="pt",
      )
      inputs = {key: value.to(device) for (key, value) in inputs.items()}

      # Generate the output
      output = model.generate(
                  **inputs,
                  temperature=0.1,
                  max_new_tokens=12000,
                  num_return_sequences=1,
                  repetition_penalty=1.2,
                  do_sample=True,
              )
      # Decode the output
      prompt_length = inputs["input_ids"].shape[1]
      new_tokens = output[:, prompt_length:]
      text_output = processor.tokenizer.batch_decode(
          new_tokens, skip_special_tokens=True
      )
      text_output = text_output[0]

    # Try to parse the output assuming it is a Python dictionary containing 'natural_text'
    try:
        json_data = json.loads(text_output)
        markdown_out = json_data.get('natural_text', "").replace("<figure>", "").replace("</figure>", "")
    except Exception as e:
        markdown_out = f"⚠️ Could not extract `natural_text` from output.\nError: {str(e)}"

    return image_pil, markdown_out


## In this demo, we use inference Typhon OCR via our API.

You can get an API key from https://opentyphoon.ai/

In [None]:
pdf_or_image_file = "/content/MDA_2016_Thai_v2.pdf"
page_num = 1 # In case of multi-page pdf, you can specific page number
task_type = "default" # we support to type of document , you can explore on 'default' and 'structure'

# Demo via API
image_pil, markdown_out = process_pdf(pdf_or_image_file, page_num ,task_type, engine_call = "api")

# Demo via local require GPU
#image_pil, markdown_out = process_pdf(pdf_or_image_file, page_num ,task_type, engine_call = "local")


In [None]:
from IPython.display import display, HTML, Markdown
display(image_pil)
display(Markdown(markdown_out))