# KnowledgeForge Workflow E2E Demo (Stage-by-Stage)

This notebook runs each pipeline stage **individually** so you can inspect the
intermediate outputs and understand what happens under the hood:

```
Parse -> Extract -> Transform -> Chunk -> Embed -> Index
```

**Prerequisites:**
- `bgf_factsheet.pdf` in `../reference/` (or place any PDF in `../data/source/factsheets/`)
- Virtual environment activated
- Run from `knowledgeforge/notebooks/` or `knowledgeforge/backend/`

In [1]:
# Step 0: Setup and imports
import sys, os, shutil, time, json
from pathlib import Path
from collections import Counter

backend_dir = Path("../backend").resolve()
if str(backend_dir) not in sys.path:
    sys.path.insert(0, str(backend_dir))
os.chdir(backend_dir)

from app.core.config import KnowledgeForgeConfig, load_config
from app.core.workflow_config import load_workflow
from app.db.session import get_engine, get_session_factory, init_db, reset_globals
from app.models.database import Document, WorkflowRun, WorkflowStage
from app.services.parsing import DocumentParser
from app.services.extraction import ContentExtractor, ContentType
from app.services.transformation import ContentTransformer
from app.services.chunking import StructureAwareChunker
from app.services.embedding import Embedder
from app.services.indexing import ChromaIndexer
from app.services.filewatcher import compute_file_hash

print("Imports successful")

Imports successful


In [2]:
# Step 1: Load global config + workflow overlay
reset_globals()
config = load_config()
wf_config = load_workflow("fund_factsheet", config)

# Build the merged service config (same logic as WorkflowOrchestrator)
svc_config = KnowledgeForgeConfig(
    source=wf_config.source,
    processing=wf_config.processing,
    indexing=wf_config.indexing,
    database=config.database,
    observability=config.observability,
)

print("Global config")
print(f"  chunk_size_tokens : {config.processing.chunking.chunk_size_tokens}")
print(f"  default_collection: {config.indexing.default_collection}")
print(f"\nWorkflow '{wf_config.name}' overlay")
print(f"  chunk_size_tokens : {wf_config.processing.chunking.chunk_size_tokens}")
print(f"  chunk_overlap     : {wf_config.processing.chunking.chunk_overlap_tokens}")
print(f"  default_collection: {wf_config.indexing.default_collection}")
print(f"  watch_folder      : {wf_config.source.watch_folder}")
print(f"  file_patterns     : {wf_config.source.file_patterns}")
print(f"\nMerged service config")
print(f"  chunk_size_tokens : {svc_config.processing.chunking.chunk_size_tokens}")
print(f"  default_collection: {svc_config.indexing.default_collection}")

Global config
  chunk_size_tokens : 512
  default_collection: default

Workflow 'fund_factsheet' overlay
  chunk_size_tokens : 256
  chunk_overlap     : 25
  default_collection: fund_factsheets
  watch_folder      : ./data/source/factsheets
  file_patterns     : ['*.pdf']

Merged service config
  chunk_size_tokens : 256
  default_collection: fund_factsheets


In [3]:
# Step 2: Initialise database + prepare document
engine = get_engine(config.database.url)
init_db(engine)
session_factory = get_session_factory(engine)

# Copy reference PDF into factsheets folder
reference_pdf = Path("../reference/bgf_factsheet.pdf").resolve()
factsheets_dir = Path("../data/source/factsheets").resolve()
factsheets_dir.mkdir(parents=True, exist_ok=True)
target_pdf = factsheets_dir / reference_pdf.name
shutil.copy2(str(reference_pdf), str(target_pdf))

file_hash = compute_file_hash(str(target_pdf))
print(f"File   : {target_pdf.name}")
print(f"Size   : {target_pdf.stat().st_size:,} bytes")
print(f"SHA-256: {file_hash[:24]}...")

session = session_factory()
existing = (
    session.query(Document)
    .filter_by(file_name=target_pdf.name, workflow_id="fund_factsheet")
    .order_by(Document.version.desc())
    .first()
)
new_version = (existing.version + 1) if existing else 1

doc = Document(
    file_name=target_pdf.name,
    file_path=str(target_pdf),
    file_type="pdf",
    version=new_version,
    file_hash=file_hash,
    workflow_id="fund_factsheet",
    status="pending",
)
session.add(doc)
session.commit()
session.refresh(doc)
session.close()
print(f"Doc id : {doc.id[:12]}..., version={doc.version}")

File   : bgf_factsheet.pdf
Size   : 229,656 bytes
SHA-256: 73734621591939a6999e5336...
Doc id : 11c41a4d-76f..., version=9


In [4]:
# Stage 1: PARSE (Docling document conversion)
parser = DocumentParser(svc_config)
t0 = time.time()
parse_result = parser.parse(doc.file_path)
parse_time = time.time() - t0

print(f"Time          : {parse_time:.1f}s")
print(f"Pages         : {parse_result.page_count}")
print(f"Est. tokens   : {parse_result.estimated_token_count:,}")
print(f"Content types : {dict(parse_result.content_types)}")
print(f"Raw text len  : {len(parse_result.raw_text):,} chars")

print("\nPer-page breakdown:")
for ps in parse_result.structure:
    print(f"  Page {ps.page_number}: {dict(ps.content_types)}")

print(f"\nRaw text preview (first 500 chars):")
print(parse_result.raw_text[:500])

  from .autonotebook import tqdm as notebook_tqdm
Parameter `strict_text` has been deprecated and will be ignored.


Time          : 137.9s
Pages         : 4
Est. tokens   : 3,373
Content types : {'section_header': 24, 'text': 100, 'page_footer': 5, 'page_header': 1, 'table': 4, 'picture': 10}
Raw text len  : 14,171 chars

Per-page breakdown:
  Page 1: {'section_header': 9, 'text': 63, 'table': 2, 'picture': 2}
  Page 2: {'section_header': 4, 'text': 9, 'page_footer': 2, 'table': 1, 'picture': 4}
  Page 3: {'page_header': 1, 'section_header': 5, 'text': 15, 'page_footer': 2, 'table': 1, 'picture': 3}
  Page 4: {'section_header': 6, 'text': 13, 'page_footer': 1, 'picture': 1}

Raw text preview (first 500 chars):
## BGF World Technology Fund

## A2 U.S. Dollar BlackRock Global Funds

December 2025

Unless otherwise stated, Performance, Portfolio Breakdowns and Net Asset information as at: 31-Dec-2025.

## INVESTMENT OBJECTIVE

The   World   Technology   Fund   seeks   to   maximise   total   return.   The   Fund   invests globally  at   least   70%   of   its   total   assets   in   the   equity   secu

In [None]:
docling_doc = parse_result.raw_document

In [None]:
docling_doc.

In [None]:
from docling.datamodel.base_models import InputFormat
from docling.datamodel.pipeline_options import (
    AcceleratorOptions, PdfPipelineOptions, TableFormerMode, TableStructureOptions
)
from docling.document_converter import DocumentConverter, PdfFormatOption
from IPython.display import display, Image as IPImage
import io

# Re-parse with picture image generation enabled
pdf_opts = PdfPipelineOptions(
    do_ocr=False,
    force_backend_text=True,
    do_table_structure=True,
    table_structure_options=TableStructureOptions(mode=TableFormerMode.FAST),
    do_picture_classification=False,
    do_picture_description=False,
    generate_page_images=True,   # needed for get_image() to work
    generate_picture_images=True,
    accelerator_options=AcceleratorOptions(device="auto"),
)

converter = DocumentConverter(
    format_options={InputFormat.PDF: PdfFormatOption(pipeline_options=pdf_opts)}
)
result = converter.convert(str(target_pdf))
doc_with_images = result.document

# Display all pictures
for i, pic in enumerate(doc_with_images.pictures):
    page_no = pic.prov[0].page_no if pic.prov else "?"
    print(f"\n--- Picture {i} (page {page_no}) ---")
    img = pic.get_image(doc_with_images)
    if img:
        display(img)
    else:
        print("  (no image data)")

In [5]:
# Stage 2: EXTRACT (structured content extraction)
extractor = ContentExtractor(svc_config)
t0 = time.time()
extracted = extractor.extract(parse_result)
extract_time = time.time() - t0

type_counts = Counter(item.content_type.value for item in extracted)
page_counts = Counter(item.page_number for item in extracted)

print(f"Time            : {extract_time:.2f}s")
print(f"Items extracted : {len(extracted)}")
print(f"By type         : {dict(type_counts)}")
print(f"By page         : { {f'p{k}': v for k, v in sorted(page_counts.items())} }")

# Unique header paths (the document's heading hierarchy)
headers = sorted(set(item.header_path for item in extracted if item.header_path))
print(f"\nUnique header paths ({len(headers)}):")
for h in headers:
    print(f"  {h}")

# Sample items
print(f"\nSample extracted items (first 5):")
for i, item in enumerate(extracted[:5]):
    text_preview = item.content[:120].replace('\n', ' ')
    if len(item.content) > 120:
        text_preview += '...'
    print(f"  [{i}] type={item.content_type.value:<18} page={item.page_number}  header='{item.header_path}'")
    print(f"       text: {text_preview}")

Time            : 0.03s
Items extracted : 98
By type         : {'image_description': 10, 'text': 84, 'table': 4}
By page         : {'p1': 67, 'p2': 13, 'p3': 4, 'p4': 14}

Unique header paths (13):
  A2 U.S. Dollar BlackRock Global Funds
  BlackRock Global Funds
  CALENDAR YEAR PERFORMANCE (%)
  CALENDAR YEAR PERFORMANCE (%) > KEY FACTS
  CUMULATIVE & ANNUALISED PERFORMANCE
  Contact Us
  FEES AND CHARGES *
  IMPORTANT INFORMATION:
  INVESTMENT OBJECTIVE
  PORTFOLIO CHARACTERISTICS
  PORTFOLIO MANAGERS
  SUSTAINABILITY CHARACTERISTICS
  SUSTAINABILITY CHARACTERISTICS DISCLOSURE:

Sample extracted items (first 5):
  [0] type=image_description  page=1  header='A2 U.S. Dollar BlackRock Global Funds'
       text: [Image]
  [1] type=text               page=1  header='A2 U.S. Dollar BlackRock Global Funds'
       text: December 2025
  [2] type=text               page=1  header='A2 U.S. Dollar BlackRock Global Funds'
       text: Unless otherwise stated, Performance, Portfolio Breakdowns and 

In [14]:
extracted[0]

ExtractedContent(content='[Image]', content_type=<ContentType.IMAGE_DESCRIPTION: 'image_description'>, page_number=1, header_path='A2 U.S. Dollar BlackRock Global Funds', metadata={'label': 'picture'})

In [22]:
from IPython.display import display
import PIL

doc = parse_result.raw_document
for item, level in doc.iterate_items():
    label = item.label.value if hasattr(item, "label") else ""
    if label == "picture":
        print("hello")
        img = item.get_image(doc)
        if img:
            display(img)

hello
hello
hello
hello
hello
hello
hello
hello
hello
hello


In [17]:
# Inspect tables and images from extraction
tables = [item for item in extracted if item.content_type == ContentType.TABLE]
images = [item for item in extracted if item.content_type == ContentType.IMAGE_DESCRIPTION]

print(f"Tables found: {len(tables)}")
for i, tbl in enumerate(tables[:2]):
    print(f"\n  Table {i} (page {tbl.page_number}, header: '{tbl.header_path}'):")
    preview = tbl.content[:400]
    for line in preview.split('\n'):
        print(f"    {line}")
    if len(tbl.content) > 400:
        print(f"    ... ({len(tbl.content)} chars total)")

print(f"\nImage descriptions found: {len(images)}")
for i, img in enumerate(images[:5]):
    print(f"  [{i}] page={img.page_number}  header='{img.header_path}'  text='{img.content[:80]}'")

Tables found: 4

  Table 0 (page 1, header: 'CUMULATIVE & ANNUALISED PERFORMANCE'):
    |                              | CUMULATIVE (%)   | CUMULATIVE (%)   | CUMULATIVE (%)   | CUMULATIVE (%)   | CUMULATIVE (%)   | ANNUALISED (%p.a.)   | ANNUALISED (%p.a.)   | ANNUALISED (%p.a.)   |
    |------------------------------|------------------|------------------|------------------|------------------|------------------|----------------------|----------------------|----------------------|
    |     
    ... (1181 chars total)

  Table 1 (page 1, header: 'CALENDAR YEAR PERFORMANCE (%)'):
    |           |   2021 |   2022 |   2023 |   2024 |   2025 |
    |-----------|--------|--------|--------|--------|--------|
    | Fund      |   8.01 | -43.06 |  49.78 |  32.5  |  18    |
    | Benchmark |  27.36 | -31.07 |  51.02 |  27.46 |  30.16 |

Image descriptions found: 10
  [0] page=1  header='A2 U.S. Dollar BlackRock Global Funds'  text='[Image]'
  [1] page=1  header='CALENDAR YEAR PERFORMANCE (%)'  t

In [None]:
# Stage 3: TRANSFORM (cleaning, normalisation, markdown generation)
transformer = ContentTransformer(svc_config)
t0 = time.time()
transform_result = transformer.transform(extracted, raw_document=parse_result.raw_document)
transform_time = time.time() - t0

print(f"Time              : {transform_time:.2f}s")
print(f"Items transformed : {len(transform_result.items)}")
print(f"Markdown length   : {len(transform_result.document_markdown):,} chars")

# Before/After comparison
if len(extracted) > 1:
    print("\nBefore/After comparison (item 1):")
    print(f"  Original   : '{extracted[1].content[:100]}'")
    print(f"  Transformed: '{transform_result.items[1].content[:100]}'")

# Show the generated markdown
print(f"\nGenerated document markdown (first 1500 chars):")
print("-" * 60)
print(transform_result.document_markdown[:1500])
print("-" * 60)
if len(transform_result.document_markdown) > 1500:
    print(f"... ({len(transform_result.document_markdown):,} chars total)")

In [None]:
# Stage 4: CHUNK (structure-aware chunking)
chunker = StructureAwareChunker(svc_config)
t0 = time.time()
chunks = chunker.chunk(transform_result.items)
chunk_time = time.time() - t0

total_tokens = sum(c.token_count for c in chunks)
token_counts = [c.token_count for c in chunks]
chunk_type_counts = Counter(c.content_type.value for c in chunks)

print(f"Time          : {chunk_time:.2f}s")
print(f"Input items   : {len(transform_result.items)}")
print(f"Output chunks : {len(chunks)}")
print(f"Total tokens  : {total_tokens:,}")
print(f"Config        : size={svc_config.processing.chunking.chunk_size_tokens}, overlap={svc_config.processing.chunking.chunk_overlap_tokens}")
print(f"Token range   : min={min(token_counts)}, max={max(token_counts)}, avg={sum(token_counts)/len(token_counts):.0f}")
print(f"By type       : {dict(chunk_type_counts)}")

# Full chunk listing
print(f"\nAll {len(chunks)} chunks:")
print(f"{'Idx':>4}  {'Type':<18}  {'Page':>4}  {'Tokens':>6}  {'Header Path':<40}  Content preview")
print("-" * 140)
for c in chunks:
    text_preview = c.content[:60].replace('\n', ' ')
    if len(c.content) > 60:
        text_preview += '...'
    print(f"{c.chunk_index:>4}  {c.content_type.value:<18}  {c.page_number:>4}  "
          f"{c.token_count:>6}  {c.header_path:<40}  {text_preview}")

In [None]:
# Stage 5: EMBED (sentence-transformers)
embedder = Embedder(svc_config)
t0 = time.time()
embed_result = embedder.embed(chunks)
embed_time = time.time() - t0

print(f"Time          : {embed_time:.2f}s")
print(f"Model         : {svc_config.processing.embedding.model}")
print(f"Batch size    : {svc_config.processing.embedding.batch_size}")
print(f"Embedded      : {len(embed_result.embedded_chunks)}")
print(f"Skipped       : {embed_result.skipped_count}")

if embed_result.embedded_chunks:
    dim = len(embed_result.embedded_chunks[0].embedding)
    print(f"Dimension     : {dim}")
    print(f"\nSample embedding (chunk 0, first 10 values):")
    print(f"  {embed_result.embedded_chunks[0].embedding[:10]}")
    if len(embed_result.embedded_chunks) > 1:
        print(f"Sample embedding (chunk 1, first 10 values):")
        print(f"  {embed_result.embedded_chunks[1].embedding[:10]}")

In [None]:
# Stage 6: INDEX (ChromaDB upsert)
indexer = ChromaIndexer(svc_config)
indexer.delete_document(doc.id)  # idempotent cleanup

t0 = time.time()
index_result = indexer.index(
    embed_result.embedded_chunks,
    document_id=doc.id,
    file_name=doc.file_name,
    version=doc.version,
    file_path=doc.file_path,
)
index_time = time.time() - t0

print(f"Time          : {index_time:.2f}s")
print(f"Collection    : {index_result.collection_name}")
print(f"Indexed       : {index_result.total_indexed} chunks")
print(f"Sample IDs    : {index_result.indexed_ids[:5]}")

# Verify via ChromaDB
import chromadb
chroma_path = Path(svc_config.indexing.chromadb_path).resolve()
client = chromadb.PersistentClient(path=str(chroma_path))
coll = client.get_collection(index_result.collection_name)
print(f"\nChromaDB verification: '{coll.name}' has {coll.count()} total chunks")

results = coll.peek(limit=3)
print(f"\nFirst 3 stored chunks:")
for i, (text, meta) in enumerate(zip(results['documents'], results['metadatas'])):
    print(f"  [{i}] type={meta.get('content_type')}, page={meta.get('page_number')}, "
          f"header='{meta.get('header_path', '')}'")
    print(f"      {text[:80]}...")

In [None]:
# Timing summary + pipeline overview
total_time = parse_time + extract_time + transform_time + chunk_time + embed_time + index_time

print("TIMING SUMMARY")
print("=" * 50)
for name, t in [("Parse", parse_time), ("Extract", extract_time),
                ("Transform", transform_time), ("Chunk", chunk_time),
                ("Embed", embed_time), ("Index", index_time)]:
    bar = '*' * max(1, int(t / total_time * 40))
    print(f"  {name:<12} {t:>7.2f}s  {bar}")
print(f"  {'TOTAL':<12} {total_time:>7.2f}s")

print(f"\nPIPELINE SUMMARY")
print("=" * 50)
print(f"  Document        : {doc.file_name} (v{doc.version})")
print(f"  Workflow        : {wf_config.name}")
print(f"  Pages           : {parse_result.page_count}")
print(f"  Extracted items : {len(extracted)}")
print(f"    text          : {type_counts.get('text', 0)}")
print(f"    table         : {type_counts.get('table', 0)}")
print(f"    image_desc    : {type_counts.get('image_description', 0)}")
print(f"  Chunks          : {len(chunks)}")
print(f"  Total tokens    : {total_tokens:,}")
print(f"  Embedding dim   : {dim}")
print(f"  Collection      : {index_result.collection_name}")
print(f"  Indexed         : {index_result.total_indexed}")
print(f"\nDemo complete!")