Skip to content

Add example DAGs for LlamaIndex RAG pipelines#67189

Closed
vikramkoka wants to merge 1 commit into
mainfrom
aip99-llamaindex-example
Closed

Add example DAGs for LlamaIndex RAG pipelines#67189
vikramkoka wants to merge 1 commit into
mainfrom
aip99-llamaindex-example

Conversation

@vikramkoka
Copy link
Copy Markdown
Contributor

@vikramkoka vikramkoka commented May 19, 2026

  • Adds example_llamaindex_rag.py with three example DAGs demonstrating RAG patterns using the new LlamaIndex operators

    • Full RAG pipeline: DocumentLoaderOperator → EmbeddingOperator → RetrievalOperator → LLMOperator
    • Separate index/query DAGs: weekly PDF indexing DAG + on-demand parameterized query DAG (production pattern)
    • Multi-source RAG: combines CSV and text files with metadata tagging, merges via @task, then embeds

    Dependencies

    Requires PR Add DocumentLoaderOperator to common.ai provider #67120 (DocumentLoaderOperator) and PR Add LlamaIndex operators to common.ai provider #67121 (LlamaIndex operators) to merge first.

    Test plan

    • Verify DAG file parses without errors after dependency PRs merge
    • Verify all three DAGs appear in the Airflow UI
    • Test full RAG pipeline end-to-end with sample text files and an OpenAI connection
    • Test parameterized query DAG with custom question parameter

Was generative AI tooling used to co-author this PR?
  • Yes (please specify the tool below)

  • Read the Pull Request Guidelines for more information. Note: commit author/co-author name and email in commits become permanently public when merged.
  • For fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
  • When adding dependency, check compliance with the ASF 3rd Party License Policy.
  • For significant user-facing changes create newsfragment: {pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.

- Adds `example_llamaindex_rag.py` with three example DAGs demonstrating RAG patterns using the new LlamaIndex operators
  - **Full RAG pipeline**: DocumentLoaderOperator → EmbeddingOperator → RetrievalOperator → LLMOperator
  - **Separate index/query DAGs**: weekly PDF indexing DAG + on-demand parameterized query DAG (production pattern)
  - **Multi-source RAG**: combines CSV and text files with metadata tagging, merges via @task, then embeds

  ## Dependencies

  Requires PR #67120 (DocumentLoaderOperator) and PR #67121 (LlamaIndex operators) to merge first.

  ## Test plan

  - [ ] Verify DAG file parses without errors after dependency PRs merge
  - [ ] Verify all three DAGs appear in the Airflow UI
  - [ ] Test full RAG pipeline end-to-end with sample text files and an OpenAI connection
  - [ ] Test parameterized query DAG with custom `question` parameter
kaxil added a commit that referenced this pull request May 21, 2026
… examples

Per Kaxil's review r3267387604: ``RetrievalOperator`` / ``EmbeddingOperator``
are too generic in the common.ai namespace -- they risk colliding when
other frameworks add their own embedding/retrieval operators. Renamed
both with the LlamaIndex prefix:

- ``EmbeddingOperator`` -> ``LlamaIndexEmbeddingOperator``
- ``RetrievalOperator`` -> ``LlamaIndexRetrievalOperator``

Renames applied across the two operator modules, three docs RSTs, the
two test files, both example DAGs, and the cross-refs in
``docs/operators/index.rst``, ``docs/hooks/llamaindex.rst``,
``docs/operators/document_loader.rst``, and ``docs/hooks/index.rst``.

Folds in #67189 (``example_llamaindex_rag.py``) which would otherwise
sit blocked waiting for this PR to merge. Rewritten for the new API:

- Uses the renamed classes
- Drops ``documents="{{ ti.xcom_pull(...) }}"`` Jinja templating
  (template_fields removed; bind via ``loader.output`` direct)
- Switches LlamaIndex operators to ``llamaindex_default`` conn (was
  ``pydanticai_default``); the synthesis-step ``LLMOperator`` keeps
  ``pydanticai_default`` because it's pydantic-ai-backed (different
  framework, intentional split documented in the module docstring)
- Adds explicit ``embed_model="text-embedding-3-small"`` to every
  embedding/retrieval call (new operator validation requires it)
- Fixes the string-reference task chains (``load >> "build_index"`` ->
  ``load >> build_index``) which weren't valid task dependencies

Closes #67189.
@kaxil
Copy link
Copy Markdown
Member

kaxil commented May 21, 2026

Folded into #67121 as part of the comprehensive LlamaIndex rewrite -- closing in favor of that.

The example_llamaindex_rag.py file landed in apache/airflow:aip99-llamaindex at 82fa97b with the following adjustments to match the rewritten operator API:

  • Uses the renamed LlamaIndexEmbeddingOperator / LlamaIndexRetrievalOperator classes (per Kaxil's r3267387604 -- generic *Operator names risked collision with future framework operators).
  • Drops documents="{{ ti.xcom_pull(...) }}" Jinja templating -- template_fields was removed because list[dict] doesn't survive Jinja stringification and templating doc text would also expand {{ var.value.api_key }} tokens inside user documents (secret-leak vector). Binds via loader.output direct instead.
  • LlamaIndex operators use the new llamaindex_default conn (was pydanticai_default); the synthesis-step LLMOperator keeps pydanticai_default because it's pydantic-ai-backed (different framework, intentional split documented in the module docstring).
  • Adds explicit embed_model="text-embedding-3-small" to each embedding/retrieval call -- the rewritten operator validates that embed_model is set (either on the operator, or via extra["embed_model"] on the connection).
  • Fixes the string-reference task chains (load >> "build_index" -> load >> build_index) which weren't valid task dependencies.

Thanks for the example DAGs -- they're the load-bearing demos for the LlamaIndex RAG story.

@kaxil kaxil closed this May 21, 2026
kaxil added a commit that referenced this pull request May 21, 2026
* Add LlamaIndex operators to common.ai provider

 - Adds LlamaIndexHook to bridge Airflow connections to LlamaIndex's Settings singleton. Reuses the pydanticai connection type, supports separate
  embedding and LLM connections.
  - Adds EmbeddingOperator to chunk documents and produce embedding vectors via LlamaIndex's SentenceSplitter. Input is list[dict(text, metadata)]
  (same shape as DocumentLoaderOperator output), output includes chunks with vectors ready for downstream vector store ingest operators (pgvector,
  Pinecone, Weaviate).
  - Adds RetrievalOperator to load a persisted LlamaIndex index and perform similarity search. Output is scored chunks ready for synthesis via
  LLMOperator.

  Design notes

  All LlamaIndex imports are lazy (inside execute() / method bodies), so modules parse without llama-index installed. The hook currently hardcodes
  OpenAI embedding/LLM providers; a follow-up PR will refactor to use BaseAIHook for provider-agnostic model resolution when it lands.

  What's included

  ┌─────────────────────────────────────────┬──────────────────────────────────────────┐
  │                  File                   │                 Purpose                  │
  ├─────────────────────────────────────────┼──────────────────────────────────────────┤
  │ hooks/llamaindex.py                     │ Hook (~110 lines)                        │
  ├─────────────────────────────────────────┼──────────────────────────────────────────┤
  │ operators/llamaindex_embedding.py       │ EmbeddingOperator (~110 lines)           │
  ├─────────────────────────────────────────┼──────────────────────────────────────────┤
  │ operators/llamaindex_retrieval.py       │ RetrievalOperator (~90 lines)            │
  ├─────────────────────────────────────────┼──────────────────────────────────────────┤
  │ tests/.../test_llamaindex.py            │ 12 hook tests                            │
  ├─────────────────────────────────────────┼──────────────────────────────────────────┤
  │ tests/.../test_llamaindex_embedding.py  │ 10 operator tests                        │
  ├─────────────────────────────────────────┼──────────────────────────────────────────┤
  │ tests/.../test_llamaindex_retrieval.py  │ 8 operator tests                         │
  ├─────────────────────────────────────────┼──────────────────────────────────────────┤
  │ docs/hooks/llamaindex.rst               │ Hook docs                                │
  ├─────────────────────────────────────────┼──────────────────────────────────────────┤
  │ docs/operators/llamaindex_embedding.rst │ EmbeddingOperator docs                   │
  ├─────────────────────────────────────────┼──────────────────────────────────────────┤
  │ docs/operators/llamaindex_retrieval.rst │ RetrievalOperator docs                   │
  ├─────────────────────────────────────────┼──────────────────────────────────────────┤
  │ provider.yaml                           │ Integration, hook, operator registration │
  ├─────────────────────────────────────────┼──────────────────────────────────────────┤
  │ docs/index.rst                          │ LlamaIndex Hook in Guides toctree        │
  ├─────────────────────────────────────────┼──────────────────────────────────────────┤
  │ docs/operators/index.rst                │ Chooser table rows                       │
  └─────────────────────────────────────────┴──────────────────────────────────────────┘

  Test plan

  - uv run --project providers/common/ai pytest providers/common/ai/tests/unit/common/ai/hooks/test_llamaindex.py -xvs (12 tests)
  - uv run --project providers/common/ai pytest providers/common/ai/tests/unit/common/ai/operators/test_llamaindex_embedding.py
  providers/common/ai/tests/unit/common/ai/operators/test_llamaindex_retrieval.py -xvs (18 tests)
  - Hook: init defaults, separate embed_conn_id, connection kwargs extraction, embedding model, LLM, Settings configuration
  - EmbeddingOperator: output shape, chunking, index persistence, vector inclusion/omission, splitter params
  - RetrievalOperator: output shape, chunk keys, top_k forwarding, multiple results, storage context

  ---
  Was generative AI tooling used to co-author this PR?

  - Yes — Claude Code (Opus 4.6)

  Generated-by: Claude Code (Opus 4.6) following
  https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions

* Refactor LlamaIndex hook + operators: no Settings mutation, BYO models, cloud URIs

Same playbook as #67192 (LangChain) and #67120 (DocumentLoader) plus
three LlamaIndex-specific architectural fixes:

Critical fixes
- Stop mutating LlamaIndex's global ``Settings`` singleton. The previous
  ``LlamaIndexHook.configure_settings()`` wrote ``Settings.embed_model``
  / ``Settings.llm`` process-wide, which leaks across concurrent tasks
  in the same worker. Replaced with per-call ``embed_model=`` /
  ``llm=`` parameters on ``VectorStoreIndex(...)`` and
  ``load_index_from_storage(...)``.
- Own ``llamaindex`` connection type instead of squatting on
  ``pydanticai``. Mirrors the LangChain / CrewAI fix.
- Remove ``documents`` from ``EmbeddingOperator.template_fields``.
  ``list[dict]`` doesn't survive Jinja stringification, and worse, a
  user document containing literal ``{{ var.value.api_key }}`` would
  leak secrets into the embedding store. Bind via ``loader.output``
  instead.

BYO embedding/LLM for non-OpenAI vendors
- LlamaIndex doesn't ship an ``init_chat_model`` / ``init_embedding_model``
  equivalent (verified in ``llama_index.core.embeddings.utils.resolve_embed_model``
  -- only ``"default"`` / ``"local"`` / ``"clip:"`` dispatch). The hook
  therefore covers OpenAI (matching LlamaIndex's own
  ``resolve_embed_model("default")`` behaviour) and operators accept a
  pre-built ``BaseEmbedding`` / ``LLM`` instance to bypass the hook for
  Cohere / Bedrock / Vertex / HuggingFace / etc.

Cloud-URI persistence
- ``EmbeddingOperator.persist_dir`` and
  ``RetrievalOperator.index_persist_dir`` accept storage URIs
  (``s3://``, ``gs://``, ``azure://``) resolved via
  ``ObjectStoragePath`` and fsspec, matching the merged
  ``DocumentLoaderOperator`` pattern.

Hook plumbing playbook (mirrors LangChain / CrewAI / DocumentLoader)
- ``conn_type = "llamaindex"`` + new ``connection-types`` entry in
  ``provider.yaml`` with ``embed_model`` / ``llm_model`` conn-fields.
- ``default_conn_name`` resolves at runtime via
  ``llm_conn_id: str | None = None``.
- ``_resolve_model`` honours ``conn.extra_dejson`` for parity with the
  sibling hooks (swallows ``JSONDecodeError``, applies secret masking).
- ``get_ui_field_behaviour`` added.
- ``[llamaindex]`` extra in ``pyproject.toml`` pinning
  ``llama-index-core``, ``llama-index-embeddings-openai``,
  ``llama-index-llms-openai`` (enough to back the hook's default
  OpenAI return values). Same in the ``dev`` group.

Misc operator/test fixes
- Wrap lazy ``llama_index`` imports with
  ``AirflowOptionalProviderFeatureException`` so missing extras surface
  cleanly.
- ``RetrievalOperator`` returns ``{"query": ..., "chunks": [...]}``
  (was ``"question"``) and ``chunks[*].node_id`` (was the misleading
  ``"source"`` key).
- ``RetrievalOperator`` raises ``FileNotFoundError`` with a "did you
  run EmbeddingOperator first?" hint when ``index_persist_dir`` is
  missing.
- All three test files get an autouse fixture stubbing
  ``llama_index.*`` in ``sys.modules`` so ``@patch`` resolves without
  ``llama-index-*`` packages installed in CI's non-DB test env
  (mirrors #67237).
- New ``example_llamaindex_hook.py`` with ``[START howto_*]`` markers
  for the docs to ``exampleinclude``.

* Rename LlamaIndex operators with framework prefix; fold in #67189 RAG examples

Per Kaxil's review r3267387604: ``RetrievalOperator`` / ``EmbeddingOperator``
are too generic in the common.ai namespace -- they risk colliding when
other frameworks add their own embedding/retrieval operators. Renamed
both with the LlamaIndex prefix:

- ``EmbeddingOperator`` -> ``LlamaIndexEmbeddingOperator``
- ``RetrievalOperator`` -> ``LlamaIndexRetrievalOperator``

Renames applied across the two operator modules, three docs RSTs, the
two test files, both example DAGs, and the cross-refs in
``docs/operators/index.rst``, ``docs/hooks/llamaindex.rst``,
``docs/operators/document_loader.rst``, and ``docs/hooks/index.rst``.

Folds in #67189 (``example_llamaindex_rag.py``) which would otherwise
sit blocked waiting for this PR to merge. Rewritten for the new API:

- Uses the renamed classes
- Drops ``documents="{{ ti.xcom_pull(...) }}"`` Jinja templating
  (template_fields removed; bind via ``loader.output`` direct)
- Switches LlamaIndex operators to ``llamaindex_default`` conn (was
  ``pydanticai_default``); the synthesis-step ``LLMOperator`` keeps
  ``pydanticai_default`` because it's pydantic-ai-backed (different
  framework, intentional split documented in the module docstring)
- Adds explicit ``embed_model="text-embedding-3-small"`` to every
  embedding/retrieval call (new operator validation requires it)
- Fixes the string-reference task chains (``load >> "build_index"`` ->
  ``load >> build_index``) which weren't valid task dependencies

Closes #67189.

* Address code-review findings on LlamaIndex operators

- Fix ObjectStoragePath conn_id mangling: pass raw URI to LlamaIndex
  persist_dir= and supply target.fs separately. str(target) returns
  s3://<conn_id>@<bucket>/..., which fsspec misinterprets.
- Add documents / embed_model / embed_conn_id to template_fields so
  XComArg resolution fires. The previous "list[dict] doesn't survive
  stringification" rationale was wrong; Templater unwraps resolvables
  before Jinja.
- Default llm_conn_id to None on both operators; LlamaIndexHook
  resolves to default_conn_name at runtime. Hard-coding
  "llamaindex_default" undid the hook's careful runtime resolution.
- Add embed_conn_id pass-through for separate embedding credentials.
- Replace isinstance(str) duck-typing with hasattr-based BaseEmbedding
  check; raise TypeError with a clear pointer instead of letting an
  unresolved XComArg or random object explode later.
- Hoist 'import os' and 'from pathlib import Path' to module top.
- Pad RST title underlines and refresh docs/tests to match the new
  surface.

* Fix mypy on LlamaIndex embedding operator

- Pass persist_dir as a typed str arg to _persist so the existing
  None-narrowing # type: ignore comments can go away.
- Cast SentenceSplitter nodes to list[TextNode] for the .text access:
  the splitter only ever returns TextNode, but the base
  get_nodes_from_documents signature is typed as list[BaseNode].

* Install llama-index in tests instead of stubbing sys.modules

llama-index-core / -embeddings-openai / -llms-openai were declared in
the common.ai provider's dev dependency group but missing from uv.lock,
so CI never actually installed them. The tests papered over that by
faking out llama_index.* in sys.modules with MagicMocks.

Refresh uv.lock so the packages get installed, then drop the
sys.modules manipulation:

- test_llamaindex.py: remove the autouse _stub_llama_index_modules
  fixture entirely; @patch resolves against the real modules.
- test_llamaindex_embedding.py / test_llamaindex_retrieval.py: replace
  the _stub_li fixture (sys.modules setitem) with a smaller _li fixture
  that uses monkeypatch.setattr against real llama_index.core symbols.

* Apply ruff lint/format fixes

---------

Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants