# RAG pipeline (Cloudflare Workers AI + Vectorize)

This notebook ingests PDF documents, chunks them, generates embeddings using Cloudflare Workers AI, and stores vectors in Cloudflare Vectorize.

- Embedding model: `@cf/baai/bge-base-en-v1.5` (768-dim)
- Vector DB: Cloudflare Vectorize (REST API)
- Input: PDFs under `../data/pdf_files`


In [1]:
# Imports and setup
import os
import time
import uuid
import requests
import numpy as np
from pathlib import Path
from typing import List, Any
from dotenv import load_dotenv

from langchain_community.document_loaders import PyPDFLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter

# Optional progress bar
try:
    from tqdm import tqdm
except Exception:
    def tqdm(x, **kwargs):
        return x

load_dotenv()

# Environment configuration
CLOUDFLARE_ACCOUNT_ID = os.getenv("CLOUDFLARE_ACCOUNT_ID", "<YOUR_ACCOUNT_ID>")
CLOUDFLARE_API_TOKEN = os.getenv("CLOUDFLARE_API_TOKEN", "<YOUR_API_TOKEN>")
VECTORIZE_INDEX_NAME = os.getenv("VECTORIZE_INDEX_NAME", "gst-rag-worker")

# Embedding model per Cloudflare docs
CF_EMBEDDINGS_MODEL = "@cf/baai/bge-base-en-v1.5"  # 768-dim

if any(v.startswith("<YOUR_") for v in [CLOUDFLARE_ACCOUNT_ID, CLOUDFLARE_API_TOKEN]):
    print("WARNING: Set CLOUDFLARE_ACCOUNT_ID and CLOUDFLARE_API_TOKEN in your environment or edit this cell.")


  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# PDF discovery and loading

def process_all_pdfs(pdf_directory: str) -> List[Any]:
    """Load all PDFs found under a directory (recursive), returning LangChain Documents."""
    all_documents = []
    pdf_dir = Path(pdf_directory)
    pdf_files = list(pdf_dir.glob("**/*.pdf"))
    print(f"Found {len(pdf_files)} PDF files to process")

    for pdf_file in pdf_files:
        print(f"\nProcessing: {pdf_file.name}")
        try:
            loader = PyPDFLoader(str(pdf_file))
            documents = loader.load()
            for doc in documents:
                doc.metadata['source_file'] = pdf_file.name
                doc.metadata['file_type'] = 'pdf'
            all_documents.extend(documents)
            print(f"  ✓ Loaded {len(documents)} pages")
        except Exception as e:
            print(f"  ✗ Error: {e}")

    print(f"\nTotal documents loaded: {len(all_documents)}")
    return all_documents

all_pdf_documents = process_all_pdfs("../data/pdf_files")
len(all_pdf_documents)


Found 6 PDF files to process

Processing: Bill's_Windsurf_Shop_Invoice.pdf
  ✓ Loaded 1 pages

Processing: Amy's_Bird_Sanctuary_Invoice.pdf
  ✓ Loaded 1 pages

Processing: Interim-Presentation.pdf
  ✓ Loaded 71 pages

Processing: Cool_Cars_Invoice.pdf
  ✓ Loaded 1 pages

Processing: Dukes_Basketball_Camp_Invoice.pdf
  ✓ Loaded 1 pages

Processing: Diego_Rodriguez_Invoice.pdf
  ✓ Loaded 1 pages

Total documents loaded: 76


76

In [3]:
# Chunking

def split_documents(documents: List[Any], chunk_size=1000, chunk_overlap=200):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=chunk_overlap,
        length_function=len,
        separators=["\n\n", "\n", " ", ""],
    )
    split_docs = text_splitter.split_documents(documents)
    print(f"Split {len(documents)} documents into {len(split_docs)} chunks")
    if split_docs:
        print("Example chunk preview:")
        print(split_docs[0].page_content[:200], "...")
        print(split_docs[0].metadata)
    return split_docs

chunks = split_documents(all_pdf_documents)
len(chunks)


Split 76 documents into 93 chunks
Example chunk preview:
Invoice for Bill's Windsurf Shop
Email: Surf@Intuit.com
Invoice Details:
Description Qty Unit Price Amount
Design Service 1 $500.00 $500.00
Consulting 2 $200.00 $400.00
Installation 1 $300.00 $300.00
 ...
{'producer': 'PyPDF2', 'creator': 'PyPDF', 'creationdate': '', 'source': "../data/pdf_files/Bill's_Windsurf_Shop_Invoice.pdf", 'total_pages': 1, 'page': 0, 'page_label': '1', 'source_file': "Bill's_Windsurf_Shop_Invoice.pdf", 'file_type': 'pdf'}


93

In [4]:
# Cloudflare Workers AI Embeddings (REST client)

class CFWorkersAIEmbeddings:
    """
    Minimal client for Workers AI embeddings endpoint.
    POST https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/run/{model}
    Body: {"text": "<string>"}
    Returns: {"result": {"data": [[...]]}} or {"data": [...]}
    """

    def __init__(self, account_id: str, api_token: str, model: str):
        self.base = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/ai/run/{model}"
        self.headers = {
            "Authorization": f"Bearer {api_token}",
            "Content-Type": "application/json",
        }

    def embed_one(self, text: str, retries: int = 3, backoff: float = 1.5) -> np.ndarray:
        payload = {"text": text}
        last_err = None
        for attempt in range(1, retries + 1):
            try:
                r = requests.post(self.base, headers=self.headers, json=payload, timeout=60)
                if r.status_code == 200:
                    data = r.json()
                    vec = None
                    if isinstance(data, dict) and "result" in data:
                        result = data["result"]
                        if "data" in result and result["data"]:
                            first = result["data"][0]
                            vec = first if isinstance(first, list) else result["data"]
                    elif "data" in data:
                        first = data["data"][0] if isinstance(data["data"], list) and data["data"] and isinstance(data["data"][0], list) else data["data"]
                        vec = first
                    if vec is None:
                        raise ValueError(f"Unexpected response structure: {data}")
                    return np.array(vec, dtype=np.float32)
                else:
                    last_err = RuntimeError(f"HTTP {r.status_code}: {r.text[:300]}")
            except Exception as e:
                last_err = e
            time.sleep(backoff ** (attempt - 1))
        raise last_err

    def embed_batch(self, texts: List[str]) -> np.ndarray:
        vectors = []
        for t in tqdm(texts, desc="Embedding with Workers AI"):
            vectors.append(self.embed_one(t))
        return np.vstack(vectors)

cf_embedder = CFWorkersAIEmbeddings(
    account_id=CLOUDFLARE_ACCOUNT_ID,
    api_token=CLOUDFLARE_API_TOKEN,
    model=CF_EMBEDDINGS_MODEL,
)


In [5]:
# Cloudflare Vectorize client (REST)

class CFVectorize:
    """
    Minimal client for Cloudflare Vectorize upsert and query.
    Upsert: POST /client/v4/accounts/{account_id}/vectorize/indexes/{index_name}/upsert
    Query:  POST /client/v4/accounts/{account_id}/vectorize/indexes/{index_name}/query
    """

    def __init__(self, account_id: str, api_token: str, index_name: str):
        self.base = f"https://api.cloudflare.com/client/v4/accounts/{account_id}/vectorize/indexes/{index_name}"
        # Only set Authorization; let 'requests' set Content-Type for JSON
        self.headers = {
            "Authorization": f"Bearer {api_token}",
        }

    def upsert(self, vectors: List[dict]):
        url = f"{self.base}/upsert"
        r = requests.post(url, headers=self.headers, json={"vectors": vectors}, timeout=60)
        if r.status_code != 200:
            raise RuntimeError(f"Vectorize upsert failed: HTTP {r.status_code}: {r.text[:300]}")
        return r.json()

    def query(self, vector: List[float], top_k: int = 5, include_vectors: bool = False):
        url = f"{self.base}/query"
        body = {"vector": vector, "topK": top_k, "includeVectors": include_vectors}
        r = requests.post(url, headers=self.headers, json=body, timeout=60)
        if r.status_code != 200:
            raise RuntimeError(f"Vectorize query failed: HTTP {r.status_code}: {r.text[:300]}")
        return r.json()

vectorize = CFVectorize(
    account_id=CLOUDFLARE_ACCOUNT_ID,
    api_token=CLOUDFLARE_API_TOKEN,
    index_name=VECTORIZE_INDEX_NAME,
)


In [6]:
# Generate embeddings for chunks and upsert to Vectorize

texts = [c.page_content for c in chunks]
print(f"Generating embeddings for {len(texts)} chunks using Workers AI model: {CF_EMBEDDINGS_MODEL}")
embeddings = cf_embedder.embed_batch(texts)
print("Embeddings shape:", embeddings.shape)

# Prepare vectors payload with metadata
vectors = []
for i, (doc, vec) in enumerate(zip(chunks, embeddings)):
    vec_id = f"doc_{uuid.uuid4().hex[:8]}_{i}"
    meta = dict(doc.metadata)
    meta["doc_index"] = i
    meta["content_length"] = len(doc.page_content)
    vectors.append({
        "id": vec_id,
        "values": vec.astype(float).tolist(),
        "metadata": meta,
    })

print(f"Upserting {len(vectors)} vectors to Vectorize index '{VECTORIZE_INDEX_NAME}' ...")
upsert_result = vectorize.upsert(vectors)
upsert_result


Generating embeddings for 93 chunks using Workers AI model: @cf/baai/bge-base-en-v1.5


Embedding with Workers AI: 100%|██████████| 93/93 [00:26<00:00,  3.46it/s]


Embeddings shape: (93, 768)
Upserting 93 vectors to Vectorize index 'gst-regulations-index' ...


RuntimeError: Vectorize upsert failed: HTTP 400: {
  "result": null,
  "success": false,
  "errors": [
    {
      "code": 1005,
      "message": "vectorize.unknown_content_type"
    }
  ],
  "messages": []
}


In [None]:
# Optional: quick similarity query to validate ingestion

def query_similar(text: str, top_k: int = 3):
    q_vec = cf_embedder.embed_one(text).astype(float).tolist()
    res = vectorize.query(q_vec, top_k=top_k, include_vectors=False)
    return res

result = query_similar("What is in the invoice for Bill's Windsurf Shop?", top_k=3)
result


In [None]:
# Diagnostics: Verify Cloudflare credentials and Vectorize access
import json

CF_API_BASE = f"https://api.cloudflare.com/client/v4"
COMMON_HEADERS = {
    "Authorization": f"Bearer {CLOUDFLARE_API_TOKEN}",
    "Content-Type": "application/json",
}

# 1) Verify token
verify_resp = requests.get(f"{CF_API_BASE}/user/tokens/verify", headers=COMMON_HEADERS, timeout=30)
print("Token verify status:", verify_resp.status_code)
try:
    print(json.dumps(verify_resp.json(), indent=2)[:800])
except Exception as e:
    print("Token verify parse error:", e, verify_resp.text[:300])

# 2) Check index exists
index_url = f"{CF_API_BASE}/accounts/{CLOUDFLARE_ACCOUNT_ID}/vectorize/indexes/{VECTORIZE_INDEX_NAME}"
idx_resp = requests.get(index_url, headers=COMMON_HEADERS, timeout=30)
print("Index get status:", idx_resp.status_code)
try:
    print(json.dumps(idx_resp.json(), indent=2)[:800])
except Exception as e:
    print("Index get parse error:", e, idx_resp.text[:300])

# 3) Try a minimal upsert (single small vector) to isolate auth/scopes
try:
    test_vec = {
        "id": "diag_test",
        "values": [0.0]*768,
        "metadata": {"_diag": True},
    }
    upsert_url = f"{CF_API_BASE}/accounts/{CLOUDFLARE_ACCOUNT_ID}/vectorize/indexes/{VECTORIZE_INDEX_NAME}/upsert"
    upsert_resp = requests.post(upsert_url, headers=COMMON_HEADERS, json={"vectors": [test_vec]}, timeout=30)
    print("Upsert test status:", upsert_resp.status_code)
    print(upsert_resp.text[:800])
except Exception as e:
    print("Upsert test exception:", e)


Token verify status: 200
{
  "result": {
    "id": "358de5b477464356f9d512c2744f89c1",
    "status": "active"
  },
  "success": true,
  "errors": [],
  "messages": [
    {
      "code": 10000,
      "message": "This API Token is valid and active",
      "type": null
    }
  ]
}
Index get status: 404
{
  "result": null,
  "success": false,
  "errors": [
    {
      "code": 3000,
      "message": "vectorize.index.not_found - Index name \"gst-regulations-index\""
    }
  ],
  "messages": []
}
Upsert test status: 400
{
  "result": null,
  "success": false,
  "errors": [
    {
      "code": 1005,
      "message": "vectorize.unknown_content_type"
    }
  ],
  "messages": []
}

