# Document Processing with Daft

👋 Hello and welcome to [Daft](http://www.daft.ai/)! This tutorial shows how to use Daft to create a typical PDF processing pipeline. By the end of the tutorial, we will have a fully functional pipeline that:
- [starts with downloading PDFs from an S3](https://colab.research.google.com/drive/1QeYdSz87DBauPsokN3RNBLXOVB1zexD5#scrollTo=Pg8UgK_3XqWN)
- [extracts text boxes either using OCR or by reading the file format](https://colab.research.google.com/drive/1QeYdSz87DBauPsokN3RNBLXOVB1zexD5#scrollTo=Bilsa6-2zCk5)
- [performs spatial layout analysis to group text boxes into either lines or paragraphs](https://colab.research.google.com/drive/1QeYdSz87DBauPsokN3RNBLXOVB1zexD5#scrollTo=ycj_Q7tA8dBf)
- [computes embeddings using a lightweight LLM, running locally](https://colab.research.google.com/drive/1QeYdSz87DBauPsokN3RNBLXOVB1zexD5#scrollTo=YWzrR3sY8tBo)
- [saves everything as Parquet files](https://colab.research.google.com/drive/1QeYdSz87DBauPsokN3RNBLXOVB1zexD5#scrollTo=wvl_Our9-N4o)


**tl;dr**: If you'd like, you can [jump to the end to see the full pipeline](https://colab.research.google.com/drive/1QeYdSz87DBauPsokN3RNBLXOVB1zexD5#scrollTo=R1_rosls8ykn).

Before we see any code, let's install Daft and all of the dependencies we'll use in this tutorial!

In [None]:
! pip install daft[aws] pillow pydantic PyMuPDF pytesseract sentence-transformers pydantic-to-pyarrow pdf2image accelerate ftfy regex tqdm git+https://github.com/openai/CLIP.git

Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to /tmp/pip-req-build-zuu20m1e
  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git /tmp/pip-req-build-zuu20m1e
  Resolved https://github.com/openai/CLIP.git to commit dcba3cb2e2827b402d2701e7e1c7d9fed8a20ef1
  Preparing metadata (setup.py) ... [?25l[?25hdone


This is the complete set of imports that we'll use throughout this notebook tutorial. We'll evaluate them once here and reuse in the next cells.

In [None]:
import io
import traceback
from collections.abc import Iterator
from datetime import datetime
from types import NoneType
from typing import Any, Optional, Union, get_args, get_origin

import clip
import fitz
import numpy as np
import pyarrow
import pytesseract
import torch
from PIL import Image
from pydantic import BaseModel
from pydantic_to_pyarrow import get_pyarrow_schema
from sentence_transformers import SentenceTransformer

import daft
from daft import Series, col, udf
from daft.udf import UDF

# Our PDF Data

First, we get the S3 URLs for all of the PDFs that we'll use here.

In [None]:
IO_CONFIG = daft.io.IOConfig(s3=daft.io.S3Config(anonymous=True))

df_sample = daft.from_glob_path(
    "s3://daft-public-data/tutorials/document-processing/industry_documents_library/pdfs/*",
    io_config=IO_CONFIG,
).limit(100)

To get a sense for what this data looks like, we can use the [`show`](https://docs.daft.ai/en/stable/api/dataframe/#daft.DataFrame.show) method on the `daft.DataFrame`. This doesn't materialize the entire DataFrame. Instead, it only does enough computation to show us the first 10 rows.

In [None]:
df_sample.show(10)

#### Downloading PDFs

We can use Daft to download these PDFs from S3 in parallel! Let's see what that looks like on this sample:

In [None]:
df_sample = df_sample.collect()

_start = datetime.now()
df_sample_downloaded = df_sample.with_column("pdf_bytes", col("path").url.download(io_config=IO_CONFIG))
df_sample_downloaded = df_sample_downloaded.collect()
_end = datetime.now()
print(f"Downloaded {df_sample_downloaded.count_rows()} PDFs from S3 in {_end - _start}")

print(df_sample_downloaded)

In [None]:
df_sample_downloaded["path"]

Daft knows about URLs and has built-in support for downloading their contents! This is exposed via the [`.url.download()` method](https://docs.daft.ai/en/stable/api/expressions/#daft.expressions.expressions.ExpressionUrlNamespace.download) on a column expression (that's the [`col('path')`](https://docs.getdaft.io/en/stable/api/expressions/#daft.expressions.col)).

### Pydantic Document Classes

Let's switch back to building out a document processing pipeline. We know that we can get the PDF bytes and load up each document. But, for our pipeline, we'd like to have a structured representation for the content we care about in each document. Namely, the text!

 Documents are two-dimensional: when doing document processing, we care about what the document says and _where_ it says it. What page? Where on the page? We can often make inferences on what role a piece of text fills by where it occurs on a page. For example, if we're processing forms, something right next to the "First Name:" field is _probably_ someone's first name.

 If we're doing ML after this pipeline, we will absolutely want to provide this spatial information to our model.

 So, before we can define any steps in our pipeline, we will need to define some Pydantic classes to help us represent a document!

In [None]:
class BoundingBox(BaseModel):
    x: int
    y: int
    w: int
    h: int

    def as_cropbox(self) -> tuple[int, int, int, int]:
        """Returns (x0, y0, x1, y1)"""
        return (
            self.x,
            self.y,
            self.x + self.w,
            self.y + self.h,
        )


class TextBlock(BaseModel):
    text: str
    bounding_box: BoundingBox


class DPI(BaseModel):
    height: float
    width: float


class ParsedPage(BaseModel):
    page_index: int
    text_blocks: list[TextBlock]
    images: list[BoundingBox]
    page_height: int
    page_width: int
    dpi: Optional[DPI]


class ParsedPdf(BaseModel):
    pdf_path: str
    total_pages: int
    pages: list[ParsedPage]


class TextLine(BaseModel):
    words: list[TextBlock]
    bounding_box: BoundingBox


class IndexedTextBlock(BaseModel):
    index: int
    text: TextBlock


class Processed(BaseModel):
    page_index: int
    indexed_texts: list[IndexedTextBlock]

### Generating Daft Datatypes from Pydantic

We also need to define a function that will let us easily generate Daft DataTypes from our Pydantic classes using PyArrow. We'll use this function, `daft_pyarrow_datatype`, to let us automatically generate the [`daft.DataType`](https://docs.getdaft.io/en/v0.2.13/api_docs/datatype.html#):

In [None]:
def daft_pyarrow_datatype(f_type: type[Any]) -> daft.DataType:
    return daft.DataType.from_arrow_type(pyarrow_datatype(f_type))


def pyarrow_datatype(f_type: type[Any]) -> pyarrow.DataType:
    if get_origin(f_type) is Union:
        targs = get_args(f_type)
        if len(targs) == 2:
            if targs[0] is NoneType and targs[1] is not NoneType:
                refined_inner = targs[1]
            elif targs[0] is not NoneType and targs[1] is NoneType:
                refined_inner = targs[0]
            else:
                raise TypeError(f"Cannot convert a general union type {f_type} into a pyarrow.DataType!")
            inner_type = pyarrow_datatype(refined_inner)
        else:
            raise TypeError(f"Cannot convert a general union type {f_type} into a pyarrow.DataType!")

    elif get_origin(f_type) is list:
        targs = get_args(f_type)
        if len(targs) != 1:
            raise TypeError(
                f"Expected list type {f_type} with inner element type but " f"got {len(targs)} inner-types: {targs}"
            )
        element_type = targs[0]
        inner_type = pyarrow.list_(pyarrow_datatype(element_type))

    elif get_origin(f_type) is dict:
        targs = get_args(f_type)
        if len(targs) != 2:
            raise TypeError(
                f"Expected dict type {f_type} with inner key-value types but got " f"{len(targs)} inner-types: {targs}"
            )
        kt, vt = targs
        pyarrow_kt = pyarrow_datatype(kt)
        pyarrow_vt = pyarrow_datatype(vt)
        inner_type = pyarrow.map_(pyarrow_kt, pyarrow_vt)

    elif get_origin(f_type) is tuple:
        raise TypeError(f"Cannot support tuple types: {f_type}")

    elif issubclass(f_type, BaseModel):
        schema = get_pyarrow_schema(f_type)
        inner_type = pyarrow.struct([(f, schema.field(f).type) for f in schema.names])

    elif issubclass(f_type, str):
        inner_type = pyarrow.string()

    elif issubclass(f_type, int):
        inner_type = pyarrow.int64()

    elif issubclass(f_type, float):
        inner_type = pyarrow.float64()

    elif issubclass(f_type, bool):
        inner_type = pyarrow.bool_()

    elif issubclass(f_type, bytes):
        inner_type = pyarrow.binary()

    elif issubclass(f_type, datetime):
        inner_type = pyarrow.date64()

    else:
        raise TypeError(f"Cannot handle general Python objects in Arrow: {f_type}")

    return inner_type

We will use `daft_pyarrow_datatype` when we define the `return_dtype` in our upcoming user-defined functions (UDF)!

# Loading & Parsing PDFs using UDFs

The first part of our pipeline is to load the PDF's contents, locate all text, and put these results into our `ParsedPdf` class.

This procedure can either perform OCR to locate text boxes or it can inspect the PDF and, if it is supported, directly extract text boxes. Note that there are no guarentees that a PDF will support text. And if supported, there are no guarentees that the text boxes will make sense from a human readability standpoint.

We will create a user defined function (UDF) to allow Daft to load and parse our PDFs. This UDF, `LoadDirectAndParsePdf`, will use supporting functions for performing OCR with Tesseract or for extracting text out of the file via PyMuPDF.

In [None]:
# Daft needs you to tell it what the expected output of any UDF looks like.
# We do this by specifying the return_dtype value.
#
# We're using our:
#     (a) automatic Pydantic-to-Daft datatype function
#     (b) Pydantic class
#
#                   (a) here
#                  ⌄⌄⌄⌄⌄⌄⌄⌄⌄⌄⌄⌄⌄⌄⌄⌄⌄⌄⌄⌄
#                                        (b) and here!
#                                       ⌄⌄⌄⌄⌄⌄⌄⌄⌄
@udf(return_dtype=daft_pyarrow_datatype(ParsedPdf))
class LoadDirectAndParsePdf:
    def __init__(self, ocr: bool, page_limit: Optional[int], extract_images: bool) -> None:
        self.ocr = ocr
        self.page_limit = page_limit
        self.extract_images = extract_images

    def handle(self, url: str, pdf_bytes: bytes) -> ParsedPdf:
        bytes_buffer = io.BytesIO(pdf_bytes)
        with fitz.open(stream=bytes_buffer, filetype="pdf") as pdf:
            if self.ocr:
                parsed_doc = ocr_document(pdf, page_limit=self.page_limit)
            else:
                parsed_doc = process_all_pages(
                    pdf,
                    extract_images=self.extract_images,
                    page_limit=self.page_limit,
                    get_image_dpi=False,
                )
        parsed_doc.pdf_path = url
        return parsed_doc

    def __call__(self, urls: Series, pdf_bytes: Series) -> Series:
        return Series.from_pylist(
            # NOTE: it is **vital** to call .model_dump() on each Pydantic class.
            #       Daft handles converting the data into an Arrow record, using
            #       the DataType derrived from the Pydantic class. However, it
            #       expects the data to be in a regular Python dictionary.
            [self.handle(u, p).model_dump() for u, p in zip(urls, pdf_bytes)]
        )


def ocr_document(doc: fitz.Document, *, page_limit: Optional[int] = None) -> ParsedPdf:
    pages: list[ParsedPage] = []
    for page_index in range(min(page_limit, len(doc)) if page_limit is not None else len(doc)):
        page = doc[page_index]

        image = rasterize_page(page)
        dpi_height, dpi_width = image.info.get("dpi", (72, 72))
        dpi = DPI(height=dpi_height, width=dpi_width)

        text_blocks = ocr_page(image)

        ocred_page = ParsedPage(
            page_index=page_index,
            text_blocks=text_blocks,
            images=extract_image_with_bbox(page),
            page_height=int(round(page.rect.height)),
            page_width=int(round(page.rect.width)),
            dpi=dpi,
        )
        pages.append(ocred_page)

    return ParsedPdf(
        pdf_path="",
        total_pages=len(doc),
        pages=pages,
    )


def rasterize_page(page: fitz.Page, scale: float = 2.0) -> Image.Image:
    # Create transformation matrix for scaling
    mat = fitz.Matrix(scale, scale)

    # Render page to pixmap
    pix = page.get_pixmap(matrix=mat)

    img_data = pix.tobytes("png")
    image = Image.open(io.BytesIO(img_data))
    return image


def ocr_page(image: Image.Image) -> list[TextBlock]:
    results = []
    data = pytesseract.image_to_data(image, output_type=pytesseract.Output.DICT)
    for i in range(len(data["text"])):
        text_content = data["text"][i].strip()
        if text_content:
            text_block = TextBlock(
                text=text_content,
                bounding_box=BoundingBox(
                    x=data["left"][i],
                    y=data["top"][i],
                    h=data["height"][i],
                    w=data["width"][i],
                ),
            )
            results.append(text_block)
    return results


def process_all_pages(
    doc: fitz.Document,
    *,
    extract_images: bool,
    page_limit: Optional[int] = None,
    get_image_dpi: bool = False,
) -> ParsedPdf:
    pages = []
    for page_index in range(min(page_limit, len(doc)) if page_limit is not None else len(doc)):
        pages.append(
            process_page(
                page_index,
                doc[page_index],
                extract_images=extract_images,
                get_image_dpi=get_image_dpi,
            )
        )
    return ParsedPdf(pdf_path="", total_pages=len(doc), pages=pages)


def process_page(
    page_index: int,
    page: fitz.Page,
    *,
    extract_images: bool,
    get_image_dpi: bool = False,
) -> ParsedPage:
    text_data = extract_text_with_bbox(page)

    if extract_images:
        image_data = extract_image_with_bbox(page)
    else:
        image_data = []

    if get_image_dpi:
        image = rasterize_page(page)
        dpi_height, dpi_width = image.info.get("dpi", (72, 72))
        maybe_dpi = DPI(height=dpi_height, width=dpi_width)
    else:
        maybe_dpi = None

    page_data: ParsedPage = ParsedPage(
        page_index=page_index,
        text_blocks=text_data,
        images=image_data,
        page_height=int(round(page.rect.height)),
        page_width=int(round(page.rect.width)),
        dpi=maybe_dpi,
    )
    return page_data


def extract_text_with_bbox(page: fitz.Page) -> list[TextBlock]:
    text_blocks = []
    blocks = page.get_text("dict")  # type: ignore
    for block in blocks.get("blocks", []):
        if "lines" in block:  # Text block
            for line in block["lines"]:
                for span in line["spans"]:
                    text = span["text"].strip()
                    if text:
                        x0, y0, x1, y1 = tuple(map(lambda z: int(round(z)), span["bbox"]))
                        w = x1 - x0
                        h = y1 - y0
                        text_block = TextBlock(
                            text=text,
                            bounding_box=BoundingBox(x=x0, y=y0, w=w, h=h),
                        )
                        text_blocks.append(text_block)
    return text_blocks


def extract_image_with_bbox(page: fitz.Page) -> list[BoundingBox]:
    image_bboxes = []
    for inst in page.get_image_info():  # type: ignore
        x0, y0, x1, y1 = tuple(map(lambda z: int(round(z)), inst["bbox"]))
        w = x1 - x0
        h = y1 - y0
        image_bboxes.append(BoundingBox(x=x0, y=y0, w=w, h=h))
    return image_bboxes

### Sample on first PDF

Let's see what it looks like to perform OCR and extract text from the first PDF in our collection.

In [None]:
df_first_1 = df_sample.limit(1)
df_first_1 = df_first_1.with_column("pdf_bytes", df_first_1["path"].url.download(io_config=IO_CONFIG))
df_first_1 = df_first_1.collect()
print(df_first_1)

pdf = fitz.open(stream=io.BytesIO(df_first_1.to_pylist()[0]["pdf_bytes"]), filetype="pdf")

In [None]:
ocr_text_boxes = ocr_document(pdf, page_limit=1).pages[0]
print(f"{len(ocr_text_boxes.text_blocks)} OCR'd text boxes on the first page. Sample:")
for i in range(25):
    tb = ocr_text_boxes.text_blocks[i]
    print(f"text='{tb.text}' {tb.bounding_box}")

In [None]:
extracted_text_boxes = process_all_pages(pdf, extract_images=True, page_limit=1, get_image_dpi=False).pages[0]
print(f"{len(extracted_text_boxes.text_blocks)} text boxes listed in PDF on the first page. Sample:")
for i in range(25):
    tb = extracted_text_boxes.text_blocks[i]
    print(f"text='{tb.text}' {tb.bounding_box}")

In [None]:
pdf.close()

# Document Processing

Now that we can load PDFs and format them into our `ProcessedPdf` Pydantic class, we can start to define the steps of our document processing pipeline!

Here, we will define the `DocProcessor` UDF, which uses spatial heuristics to group the extracted `TextBlock`s into more coherent and usable groups of text. We also make sure to sort the text into reading order, which we define here as left-to-right, top-to-bottom. We can choose to only group things into lines (only looking at the Y axis). Or we can choose to group into paragraphs (looking at the Y and X axes).

These heuristics have controllable thresholds (`row_tolerance` for the lines and `{x,y}_thresh` for the paragraph grouping). They _will not_ work in all usecases and domains! In general, Document Layout Analysis is hard. There are excellent research models for performing layout analysis which can easily be ran with Daft :), but we'll save that for another day!

Below is the code for this UDF and its supporting helper functions.

In [None]:
class PipelineConfig(BaseModel):
    row_tolerance: int = 10
    y_thresh: int = 15
    x_thresh: int = 60
    group_paragraphs: bool = True


@udf(return_dtype=daft_pyarrow_datatype(list[Processed]))
class DocProcessor:
    def __init__(self, *, row_tolerance: int, y_thresh: int, x_thresh: int, group_paragraphs: bool) -> None:
        self.row_tolerance = row_tolerance
        self.y_thresh = y_thresh
        self.x_thresh = x_thresh
        self.group_paragraphs = group_paragraphs

    def handle(self, doc: ParsedPdf | dict) -> Iterator[Processed]:
        parsed_doc = ParsedPdf.model_validate(doc) if isinstance(doc, dict) else doc
        for page_index, page in enumerate(parsed_doc.pages):
            text_blocks = page_pipeline(
                page,
                row_tolerance=self.row_tolerance,
                y_thresh=self.y_thresh,
                x_thresh=self.x_thresh,
                group_paragraphs=self.group_paragraphs,
            )
            yield Processed(
                page_index=page_index,
                indexed_texts=[IndexedTextBlock(index=i, text=t) for i, t in enumerate(text_blocks)],
            )

    def __call__(self, parsed: Series) -> Series:
        return Series.from_pylist(
            # Again, note the call to .model_dump() on each Pydantic object.
            [[processed.model_dump() for processed in self.handle(doc)] for doc in parsed]
        )


def page_pipeline(
    parsed_page: ParsedPage,
    *,
    row_tolerance: int,
    y_thresh: int,
    x_thresh: int,
    group_paragraphs: bool,
) -> list[TextBlock]:
    reading_order_text_blocks = sort_bounding_boxes_reading_order(parsed_page.text_blocks, row_tolerance=row_tolerance)
    text_lines: list[TextLine] = group_into_text_lines(
        reading_order_text_blocks,
        y_thresh=y_thresh,
        x_thresh=x_thresh,
    )

    if group_paragraphs:
        simplified_text_lines = group_into_paragraphs(text_lines, row_tolerance=row_tolerance)
    else:
        simplified_text_lines = [revert_to_tb(tl) for tl in text_lines]

    final_texts = sort_bounding_boxes_reading_order(simplified_text_lines, row_tolerance=row_tolerance)

    return final_texts


def sort_bounding_boxes_reading_order(boxes: list[TextBlock], *, row_tolerance: int) -> list[TextBlock]:
    """Sort a list of BoundingBox objects into reading order (left-to-right, top-to-bottom).

    This function implements a multi-line sorting algorithm that:
    1. Groups bounding boxes by approximate row (within a tolerance)
    2. Sorts boxes within each row from left to right
    3. Sorts rows from top to bottom

    Args:
        boxes: list of BoundingBox objects to sort
        row_tolerance: Tolerance for grouping boxes into the same row (in pixels)
                       Adjust this value based on your document's line spacing.

    Returns:
        list of BoundingBox objects sorted in reading order
    """
    if not boxes:
        return []

    boxes = boxes.copy()

    # Group boxes by approximate row
    rows: dict[int, list[TextBlock]] = {}

    for tbox in boxes:
        # Find which row this box belongs to
        placed = False
        for row_y, tboxes_in_row in rows.items():
            if abs(tbox.bounding_box.y - row_y) <= row_tolerance:
                tboxes_in_row.append(tbox)
                placed = True
                break

        if not placed:
            # Create a new row
            rows[tbox.bounding_box.y] = [tbox]

    # Sort each row from left to right
    for row_y, tboxes_in_row in rows.items():
        tboxes_in_row.sort(key=lambda box: box.bounding_box.x)

    # Sort rows from top to bottom
    fininished_rows = [(row_y, tboxes_in_row) for row_y, tboxes_in_row in rows.items()]
    fininished_rows.sort(key=lambda x: x[0])

    # Flatten the result
    result: list[TextBlock] = []
    for _, tboxes_in_row in fininished_rows:
        result.extend(tboxes_in_row)

    return result


def group_into_text_lines(
    reading_order_text_boxes: list[TextBlock],
    *,
    y_thresh: int,
    x_thresh: int,
) -> list[TextLine]:
    # Group into lines
    lines: list[list[TextBlock]] = []
    for word in reading_order_text_boxes:
        assigned = False
        for line in lines:
            last_word: TextBlock = line[-1]
            same_line_y: bool = abs(word.bounding_box.y - last_word.bounding_box.y) <= y_thresh
            close_x: bool = word.bounding_box.x - (last_word.bounding_box.x + last_word.bounding_box.w) <= x_thresh
            if same_line_y and close_x:
                line.append(word)
                assigned = True
                break

        if not assigned:
            lines.append([word])

    # Aggregate lines with bounding boxes
    results: list[TextLine] = [form_text_line(line_words) for line_words in lines]

    return results


def form_text_line(line_words: list[TextBlock]) -> TextLine:
    xs: list[int] = [w.bounding_box.x for w in line_words]
    ys: list[int] = [w.bounding_box.y for w in line_words]
    ws: list[int] = [w.bounding_box.w for w in line_words]
    hs: list[int] = [w.bounding_box.h for w in line_words]

    x_min = min(xs)
    y_min = min(ys)
    x_max = max(x + w for x, w in zip(xs, ws))
    y_max = max(y + h for y, h in zip(ys, hs))

    return TextLine(
        words=line_words,
        bounding_box=BoundingBox(x=x_min, y=y_min, w=x_max - x_min, h=y_max - y_min),
    )


def group_into_paragraphs(text_lines: list[TextLine], *, row_tolerance: int) -> list[TextBlock]:
    paragraphs: list[list[TextLine]] = []
    for tl in text_lines:
        assigned = False
        for p_group in paragraphs:
            p = p_group[-1]
            if abs(p.bounding_box.y + p.bounding_box.h - tl.bounding_box.y) <= row_tolerance:
                p_group.append(tl)
                assigned = True
                break
        if not assigned:
            paragraphs.append([tl])

    simplified_text_lines: list[TextBlock] = []
    for p_group in paragraphs:
        tbs: list[TextBlock] = [revert_to_tb(tl) for tl in p_group]
        paragraph: TextBlock = revert_to_tb(form_text_line(tbs))
        simplified_text_lines.append(paragraph)

    return simplified_text_lines


def revert_to_tb(tl: TextLine) -> TextBlock:
    return TextBlock(
        text=" ".join(w.text for w in tl.words),
        bounding_box=tl.bounding_box,
    )

# Text Embedding

Now that we have nice groups of text, we can generate embeddings for them! We will define a functor that makes a UDF from a `SentenceTransformer` model. The resulting UDF will make sure that the generated values are (1) of a fixed size and (2) of a known datatype (your classic 32-bit floating point number!).

In [None]:
def text_embedder_udf(model_name: str) -> UDF:
    model = SentenceTransformer(model_name)
    if torch.cuda.is_available():
        model = model.to("cuda")

    dimensionality = model.get_sentence_embedding_dimension()
    assert dimensionality is not None, f"Must supply model with known dimensionality. Invalid {model=}"
    model = model.eval()
    try:
        model.compile()
    except Exception:
        print("Could not torch.compile the SentenceTransformer model. " "Proceeding with unoptimized inference.")
        traceback.print_exc()

    @udf(return_dtype=daft.DataType.embedding(daft.DataType.float32(), dimensionality))
    def embed(texts: Series) -> Series:
        if len(texts) == 0:
            return Series.from_pylist([])

        with torch.inference_mode():
            embeddings: np.ndarray[float] = model.encode(
                texts.to_pylist(), convert_to_numpy=True, show_progress_bar=False
            )

        return Series.from_numpy(embeddings)

    return embed

# Image Embedding

In both the OCR and PDF file extracting flows, we are able to interrogate the PDF file and ask it to provide us bounding boxes of all elements in the page that are marked as images. It is often useful to be able to analyze all embedded images in a PDF. For instance, we may want to grab these because they're non-vector graphics figures in a scientific paper or charts in a business report.

Similiar to the text embedding UDF, we can define an image embedding UDF and use it to produce embeddings for all extracted image elements from our documents.

We will also need to define a UDF to crop each page image according to the bounding boxes for each embedded image. This `ImageBboxProcessor` will need to get access to the original PDF bytes as well as the image bounding box information we have in our `ParsedPdf` object.

In [None]:
@udf(return_dtype=daft.DataType.image())
class ImageBboxProcessor:
    def __init__(self, scale: float = 2.0) -> None:
        self.scale = scale

    def handle(self, pdf_bytes: bytes, doc: Union[ParsedPdf, dict]) -> Iterator[np.ndarray]:
        parsed_doc = ParsedPdf.model_validate(doc) if isinstance(doc, dict) else doc
        bytes_buffer = io.BytesIO(pdf_bytes)
        with fitz.open(stream=bytes_buffer, filetype="pdf") as pdf:
            for i, parsed_page in enumerate(parsed_doc.pages):
                page = pdf[i]
                # rasterize page
                pix = page.get_pixmap(matrix=fitz.Matrix(self.scale, self.scale))  # type: ignore
                image = Image.open(io.BytesIO(pix.tobytes("png")))

                for image_bbox in parsed_page.images:
                    sub_image = image.crop(image_bbox.as_cropbox())
                    sub_image_np = np.array(sub_image)
                    yield sub_image_np

    def __call__(self, pdf_bytes: Series, parsed: Series) -> list[np.ndarray]:
        images = []
        for pdf, doc in zip(pdf_bytes, parsed):
            for image in self.handle(pdf, doc):
                images.append(image)
        return images


def image_embedder_udf(model_name: str) -> UDF:
    device = "cuda" if torch.cuda.is_available() else "cpu"
    model, preprocess = clip.load(model_name, device=device, jit=True)

    @udf(return_dtype=daft.DataType.embedding(daft.DataType.float32(), 512))
    def embed(images: Series) -> Series:
        if len(images) == 0:
            return np.empty((0,), dtype=np.float32)

        # B x C x H x W
        image_batch = torch.stack([preprocess(Image.fromarray(image) for image in images.to_pylist())]).to(device)

        with torch.inference_mode():
            embeddings_pt = model.encode_image(image_batch)

        embeddings_np = embeddings_pt.detach().cpu().numpy()
        return Series.from_numpy(embeddings_np)

    return embed

# Entire End-to-End Pipeline

In Daft, we express our pipeline as a `daft.DataFrame` instance. We use the DataFrame's methods to produce transformations from one view of the data into another. Here, we'll combine all of the functionality defined in this tutorial into a complete DataFrame-based pipeline.


### Options
We will start out by defining the configuration options our pipeline uses -- change _any_ of these values and rerun to see how they affect the pipeline!

In [None]:
# Uses Tesseract to perform OCR if true. Otherwise tries to get text
# directly from the file format.
ocr: bool = False

# Only handle the first N pages of each PDF. Some PDFs are very long and
# can bog-down the pipeline as it waits for stragglers. For fast exploration,
# set this limit. If you want to run on all pages of each PDF, set this to None.
page_limit: Optional[int] = 10

# If true, then extract bounding boxes pertaining to embedded images in the PDF.
extract_images: bool = True

# Determine how text boxes are grouped together to form more semantically
# relevant passages of text. These options control herusitics in the
# DocProcessor UDF. Increasing the thresholds and tolerance will cause more
# distance text boxes to be grouped together.
config = PipelineConfig(
    row_tolerance=10,
    y_thresh=15,
    x_thresh=60,
    group_paragraphs=True,
)

# The text embedding model to use. See HuggingFace for a more complete list!
text_model = "all-MiniLM-L6-v2"

# This is the image embedding model to use. See the CLIP repository for more!
image_model = "ViT-B/32"

Checking some assumptions on our options for the dataframe-based processing pipeline:

In [None]:
if page_limit is not None:
    if page_limit <= 0:
        raise ValueError(f"Page limit must be positive if specified! Invalid: {page_limit=}")
    print(f"Limiting each PDF to the first {page_limit} pages.")

### Step 1: Enumerate S3 Keys

Read the S3 bucket & key prefix and get full keys.

In [None]:
df = daft.from_glob_path(
    "s3://daft-public-data/tutorials/document-processing/industry_documents_library/pdfs/*",
    io_config=IO_CONFIG,
)
print(df.schema())

### Step 2: Download PDFs

Downloads the contents of each PDF file:

In [None]:
df = df.select("path").with_column_renamed("path", "url")
df = df.with_column("pdf_bytes", col("url").url.download(io_config=IO_CONFIG))
print(df.schema())

### Step 3: Load PDFs, Maybe Apply OCR

Use the PDF and OCR libraries to load all of the documents and extract text boxes on their pages:

In [None]:
df = df.with_column(
    "parsed",
    # NOTE: We can easily define a UDF that operates on multiple columns!
    #
    #       This UDF mainly uses the downloaded PDF contents, but it also
    #       includes the URL to make a well-formed ParsedPdf instance.
    #
    #       We're configuring how the UDF operates by supplying the
    #       constructor arguments using the `with_init_args` class method.
    #
    #       These arguments are applied here   and here
    #                                    ⌄⌄⌄  ⌄⌄⌄⌄⌄⌄⌄⌄⌄⌄
    LoadDirectAndParsePdf.with_init_args(ocr, page_limit, extract_images)(col("url"), col("pdf_bytes")),
    #       We're providing the two columns to our UDF
    #                                                     ^^^^^^^^^^  ^^^^^^^^^^^^^^^^
    #                                                         here        and here
)
print(df.exclude("pdf_bytes").schema())

#### UDF Application

A note on how Daft works -- in our above UDF application, we're providing [column expressions](https://docs.getdaft.io/en/stable/core_concepts/#exploding-columns) as input. Specifically, the [`col`](https://docs.getdaft.io/en/stable/api/expressions/#daft.expressions.col) part. When we write `col("url")`, we're telling Daft to wire things up under the hood to reference the data in the `url` column of the dataframe.

Breaking down the PDF loading and parsing UDF call, the first part is actually constructing the UDF instance:
```python
LoadDirectAndParsePdf.with_init_args(ocr, page_limit)
```

While the second part is actually applying that UDF to our two columns, `url` and `pdf_bytes`:
```python
(col("url"), col("pdf_bytes"))
```

Note that this is equivalent:
```python
f = LoadDirectAndParsePdf.with_init_args(ocr, page_limit)
f(col("url"), col("pdf_bytes"))
```

### Step 4: Text Box Processing

Process the parsed document representation: perform custom logic grouping text boxes into more cohesive units (e.g. lines or paragraphs). Configurable grouping logic uses spatial information to determine each text box's group membership.


In [None]:
df = df.with_column(
    "processed_raw",
    DocProcessor.with_init_args(**config.model_dump())(col("parsed")),
)
print(df.exclude("pdf_bytes", "parsed").schema())

Reformat the text boxes structured objects (the `ParsedPdf` pydantic class instances) into rows. Each row has the reading order index, the text, the page index, and the bounding box coordinates.

In [None]:
df = (
    df.explode("processed_raw")
    .with_column("page_index", col("processed_raw").struct.get("page_index"))
    .with_column("indexed_texts", col("processed_raw").struct.get("indexed_texts"))
    .explode("indexed_texts")
    .exclude("processed_raw")
)
print(df.exclude("pdf_bytes", "parsed").schema())

#### Explaining Structure Access Expressions

Note that we're using [`.struct`](https://docs.getdaft.io/en/v0.4.6/api_docs/doc_gen/expression_methods/daft.struct.html) to construct an expression that allows Daft to extract individual field values from our complex document structure.

When write `col("text_blocks").struct.get("bounding_box")`, we're telling Daft that we want to access the `bounding_box` field of each element from the `text_blocks` column. From this, we can provide additional field-selecting logic (e.g. `["x"]` to get the value for field `x` on the `bounding_box` value from each structure in `text_blocks`).

The last part of our text box processing step is to extract the text and bounding box coordinates into their own columns. We also want to preserve the reading order index as its own column too.

This format makes it easier to form follow up queries on our data, such as:
"what are the first 10 pieces of text on the first page of each document?" or
"what text appears in the bottom-right quadrant of each page?"

In [None]:
df = (
    df.with_column("text_blocks", col("indexed_texts").struct.get("text"))
    .with_column("reading_order_index", col("indexed_texts").struct.get("index"))
    .exclude("indexed_texts")
    .with_column("text", col("text_blocks").struct.get("text"))
    .with_column("x", col("text_blocks").struct.get("bounding_box")["x"])
    .with_column("y", col("text_blocks").struct.get("bounding_box")["y"])
    .with_column("h", col("text_blocks").struct.get("bounding_box")["h"])
    .with_column("w", col("text_blocks").struct.get("bounding_box")["w"])
    .exclude("text_blocks")
)
print(df.exclude("pdf_bytes", "parsed").schema())

### Step 5: Text Embeddings

The penultimate step is to produce embeddings for each piece of text:

In [None]:
df = df.with_column("text_embeddings", text_embedder_udf(text_model)(col("text")))

### Step 6: Image Embeddings

The final step is to produce embeddings for all embedded images in the PDF. Note that this only ocurrs if `extract_images is True`. Change the configuration to change behavior here.

In [None]:
if extract_images:
    df = df.with_column("images", ImageBboxProcessor(df["pdf_bytes"], df["parsed"]))
    df = df.with_column("image_embeddings", image_embedder_udf(image_model)(df["images"]))
    df = df.exclude("images")
else:
    print("Not embedding images because embedded images were not extracted.")
df = df.exclude("pdf_bytes", "parsed")

## Executing a Lazily Constructed Pipeline

At this point, our `df` value contains the entire pipeline. We can show the final schema with a simple method call:

In [None]:
print(f"Pipeline constructed:\n{df.schema()}")

And we can peek under the hood and ask Daft to show us the entire logical plan  that it will run when we ask it to execute our pipeline:

In [None]:
print(f"Execution plan for pipeline:\n{df.explain()}")

#### Writing to Parquet

In Daft, [DataFrames are _lazy_](https://docs.getdaft.io/en/stable/core_concepts/#dataframe): they are only evaluated once it is necessary.

By invoking the [`write_parquet`](https://docs.daft.ai/en/stable/api/io/#daft.dataframe.DataFrame.write_parquet) method, we force the DataFrame to evaluate so that its contents can be written out to disk!

If we wanted to keep the values in-memory, we would call [`.collect()`](https://docs.daft.ai/en/stable/api/io/#daft.dataframe.DataFrame.collect) on the DataFrame. This process is known as _materializing the DataFrame_. Note that the `collect` method returns a new DataFrame holding the actual computed values.

In [None]:
output_dir = "./doc_proc_tutorial_parquet"
print(f"Writing out as Parquet files to: {output_dir}")

parquet_out = df.write_parquet(output_dir).to_pydict()
print(f"Complete! Wrote {df.count_rows()} rows:\n{parquet_out}")