In [1]:
"""AWS Documentation → Chroma Ingestion (Clean Notebook)

Standalone workflow for transforming AWS PDF documentation into a Chroma vector store
with Ollama embeddings, metadata propagation, chunk filtering, and write timeouts.
"""


'AWS Documentation → Chroma Ingestion (Clean Notebook)\n\nStandalone workflow for transforming AWS PDF documentation into a Chroma vector store\nwith Ollama embeddings, metadata propagation, chunk filtering, and write timeouts.\n'

In [2]:
import re
import signal
import uuid
from contextlib import contextmanager
from dataclasses import dataclass, field
from pathlib import Path
from typing import Iterator

import httpx
import ollama
import pandas as pd
from langchain_chroma import Chroma
from langchain_docling import DoclingLoader
from langchain_docling.loader import ExportType
from langchain_ollama.embeddings import OllamaEmbeddings
from langchain_text_splitters import (
    MarkdownHeaderTextSplitter,
    RecursiveCharacterTextSplitter,
)


In [3]:
@dataclass(frozen=True)
class Config:
    data_root: Path = Path(".")
    csv_filename: str = "AWSDocs.csv"
    chroma_dirname: str = "chroma_db_AWSDocs"
    collection_name: str = "AWSDocs"
    embedding_model: str = "nomic-embed-text"
    chunk_size: int = 1000
    chunk_overlap: int = 200
    batch_size: int = 50
    add_timeout_seconds: int = 60
    request_timeout: httpx.Timeout = field(
        default_factory=lambda: httpx.Timeout(30.0, connect=5.0)
    )
    header_splits: tuple[tuple[str, str], ...] = (
        ("#", "Header 1"),
        ("##", "Header 2"),
        ("###", "Header 3"),
    )

    @property
    def csv_path(self) -> Path:
        return self.data_root / self.csv_filename

    @property
    def chroma_path(self) -> Path:
        return self.data_root / self.chroma_dirname


config = Config()
config


Config(data_root=PosixPath('.'), csv_filename='AWSDocs.csv', chroma_dirname='chroma_db_AWSDocs', collection_name='AWSDocs', embedding_model='nomic-embed-text', chunk_size=1000, chunk_overlap=200, batch_size=50, add_timeout_seconds=60, request_timeout=Timeout(connect=5.0, read=30.0, write=30.0, pool=30.0), header_splits=(('#', 'Header 1'), ('##', 'Header 2'), ('###', 'Header 3')))

In [4]:
def load_documents(config: Config) -> pd.DataFrame:
    if not config.csv_path.exists():
        raise FileNotFoundError(f"Input CSV not found at {config.csv_path}")

    frame = pd.read_csv(config.csv_path)
    print(f"Loaded {len(frame)} rows from {config.csv_path}")
    return frame


df = load_documents(config)
df.head(3)


Loaded 46 rows from AWSDocs.csv


Unnamed: 0,Domain,Service,PDF_URL
0,Compute,ec2,https://docs.aws.amazon.com/pdfs/AWSEC2/latest...
1,Compute,lambda,https://docs.aws.amazon.com/pdfs/lambda/latest...
2,Compute,ecs,https://docs.aws.amazon.com/pdfs/AmazonECS/lat...


In [5]:
embeddings = OllamaEmbeddings(model=config.embedding_model)
embeddings._client = ollama.Client(host=embeddings.base_url, timeout=config.request_timeout)

vector_store = Chroma(
    collection_name=config.collection_name,
    persist_directory=str(config.chroma_path),
    embedding_function=embeddings,
)

markdown_splitter = MarkdownHeaderTextSplitter(list(config.header_splits))
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=config.chunk_size,
    chunk_overlap=config.chunk_overlap,
)


2025-11-12 22:29:22,380 - INFO - Anonymized telemetry enabled. See                     https://docs.trychroma.com/telemetry for more information.


In [6]:
@contextmanager
def time_limit(seconds: int, timeout_message: str):
    def _raise_timeout(_signum, _frame):
        raise TimeoutError(timeout_message)

    original_handler = signal.signal(signal.SIGALRM, _raise_timeout)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, original_handler)


def normalize_str(value: object) -> str:
    if value is None:
        return ""
    text = str(value).strip()
    if not text or text.lower() == "nan":
        return ""
    return text


In [7]:
def fetch_markdown(url: str) -> str:
    loader = DoclingLoader(
        file_path=url,
        export_type=ExportType.MARKDOWN,
    )
    docs_as_markdown = loader.load()
    if not docs_as_markdown:
        raise ValueError(f"No content returned for {url}")
    return docs_as_markdown[0].page_content


def should_skip_chunk(chunk) -> tuple[bool, str]:
    headers_combined = " ".join(
        header.lower()
        for header in (
            chunk.metadata.get("Header 1", ""),
            chunk.metadata.get("Header 2", ""),
            chunk.metadata.get("Header 3", ""),
        )
        if header
    )
    if "table of contents" in headers_combined:
        return True, "Table of Contents"

    content_to_check = chunk.page_content.strip()
    if content_to_check and re.fullmatch(r"[|\-\s]+", content_to_check):
        return True, "Markdown table fragment"

    return False, ""


def split_markdown(markdown_content: str, metadata: dict) -> list:
    semantic_chunks = markdown_splitter.split_text(markdown_content)
    for chunk in semantic_chunks:
        chunk.metadata.update(metadata)

    final_chunks = text_splitter.split_documents(semantic_chunks)

    filtered_chunks = []
    for idx, chunk in enumerate(final_chunks, start=1):
        should_skip, reason = should_skip_chunk(chunk)
        if should_skip:
            print(f"Skipping chunk {idx} ({reason})")
            continue
        filtered_chunks.append(chunk)

    return filtered_chunks


In [8]:
def chunk_batches(chunks: list, size: int) -> Iterator[list]:
    for start in range(0, len(chunks), size):
        yield chunks[start : start + size]


def store_chunks(chunks: list, *, source: str, config: Config) -> int:
    if not chunks:
        return 0

    chunk_ids = [str(uuid.uuid4()) for _ in chunks]
    total_batches = (len(chunks) + config.batch_size - 1) // config.batch_size
    stored_chunks = 0

    for batch_index, batch_docs in enumerate(
        chunk_batches(chunks, config.batch_size), start=1
    ):
        start_idx = (batch_index - 1) * config.batch_size
        end_idx = start_idx + len(batch_docs)
        batch_ids = chunk_ids[start_idx:end_idx]

        print(
            f"  Adding batch {batch_index}/{total_batches} "
            f"({len(batch_docs)} chunks)..."
        )

        timeout_message = (
            f"Timed out adding batch {batch_index}/{total_batches} "
            f"for {source} after {config.add_timeout_seconds} seconds"
        )

        try:
            with time_limit(config.add_timeout_seconds, timeout_message):
                vector_store.add_documents(
                    documents=batch_docs,
                    ids=batch_ids,
                )
        except TimeoutError as exc:
            print(f"{exc}. Skipping batch.")
            continue

        stored_chunks += len(batch_docs)

    return stored_chunks


In [9]:
def process_row(row: pd.Series, *, config: Config) -> int:
    url = normalize_str(row.get("PDF_URL"))
    if not url:
        raise ValueError("Missing PDF_URL value")

    metadata = {
        "domain": normalize_str(row.get("Domain")),
        "service": normalize_str(row.get("Service")),
        "source": url,
    }

    markdown_content = fetch_markdown(url)
    chunks = split_markdown(markdown_content, metadata)
    return store_chunks(chunks, source=url, config=config)


def ingest_dataframe(frame: pd.DataFrame, *, config: Config) -> int:
    total_chunks = 0

    for index, row in frame.iterrows():
        url = normalize_str(row.get("PDF_URL"))
        if not url:
            print(f"Row {index}: missing PDF_URL. Skipping.")
            continue

        print(f"Processing {url}...")
        try:
            stored = process_row(row, config=config)
        except (httpx.ReadTimeout, httpx.ConnectTimeout, httpx.TimeoutException) as exc:
            print(f"HTTP timeout while processing {url}: {exc}. Skipping.")
            continue
        except ValueError as exc:
            print(f"Data issue for {url}: {exc}. Skipping.")
            continue
        except Exception as exc:
            print(f"Error processing {url}: {exc}")
            continue

        total_chunks += stored
        if stored:
            print(f"Completed {url}: stored {stored} chunks.")
        else:
            print(f"Completed {url}: no chunks to store.")
        print()

    print(f"Ingestion complete. Stored {total_chunks} chunks in total.")
    return total_chunks


In [None]:
# Run when ready to ingest all rows.
total_chunks = ingest_dataframe(df, config=config)
print(f"Stored {total_chunks} chunks in total.")


Processing https://docs.aws.amazon.com/pdfs/AWSEC2/latest/UserGuide/ec2-ug.pdf...


2025-11-12 22:30:32,651 - INFO - detected formats: [<InputFormat.PDF: 'pdf'>]
2025-11-12 22:30:32,923 - INFO - Going to convert document batch...
2025-11-12 22:30:32,925 - INFO - Initializing pipeline for StandardPdfPipeline with options hash 44ae89a68fc272bc7889292e9b5a1bad
2025-11-12 22:30:32,959 - INFO - Loading plugin 'docling_defaults'
2025-11-12 22:30:32,962 - INFO - Registered picture descriptions: ['vlm', 'api']
2025-11-12 22:30:32,968 - INFO - Loading plugin 'docling_defaults'
2025-11-12 22:30:32,973 - INFO - Registered ocr engines: ['auto', 'easyocr', 'ocrmac', 'rapidocr', 'tesserocr', 'tesseract']
2025-11-12 22:30:34,110 - INFO - Auto OCR model selected ocrmac.
2025-11-12 22:30:34,118 - INFO - Accelerator device: 'mps'
2025-11-12 22:30:35,949 - INFO - Accelerator device: 'mps'
2025-11-12 22:30:36,702 - INFO - Processing document ec2-ug.pdf
