Add DocumentLoaderOperator to common.ai provider#67120
Merged
Merged
Conversation
kaxil
reviewed
May 18, 2026
5 tasks
- Adds DocumentLoaderOperator, a framework-agnostic file parser that bridges Airflow's connectivity layer (hooks returning bytes/files) and the AI embedding layer (operators needing list[dict(text, metadata)]). No LlamaIndex, LangChain, or other AI framework dependency. - Built-in parsers for .txt, .md, .csv, .json with zero extra deps. PDF (via pypdf, BSD) and DOCX (via python-docx, MIT) available as optional extras: pip install apache-airflow-providers-common-ai[pdf] / [docx]. - Supports two input modes: source_path (local file, directory, or glob pattern) and source_bytes (raw bytes from XCom). Output is list[dict(text, metadata)], the same shape consumed by downstream embedding operators. Motivation File parsing is the highest-volume gap in Airflow's AI story Every RAG pipeline on Airflow currently requires custom parsing code. This operator makes it a single line in a Dag. What's included ┌────────────────────────────────────┬───────────────────────────────────────────┐ │ File │ Purpose │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ operators/document_loader.py │ Operator (~270 lines) │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ tests/.../test_document_loader.py │ 26 unit tests │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ docs/operators/document_loader.rst │ Usage docs │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ provider.yaml │ Operator registration + how-to-guide link │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ pyproject.toml │ [pdf] and [docx] optional dependencies │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ docs/operators/index.rst │ Chooser table row │ └────────────────────────────────────┴───────────────────────────────────────────┘ Test plan - uv run --project providers/common/ai pytest providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py -xvs (26 tests) - Built-in parsers: txt, md, csv (one doc per row), json (single object and array) - PDF/DOCX parsers: mocked via sys.modules injection (packages not installed in test env) - ImportError guidance when optional packages are missing - Init validation: mutual exclusion of source_path/source_bytes, file_type required with source_bytes - File discovery: glob patterns, extension filtering, empty directories - Output shape: every item has text and metadata, file_name/file_path in metadata, custom metadata_fields merged
Addressed Kaxil's feedback on PR. thank you Kaxil - Remove source_bytes from template_fields (Jinja breaks bytes) - Use `is not None` validation instead of truthiness checks - Raise FileNotFoundError when no files match source_path - Normalize file_extensions filter to case-insensitive - Fix temp file leak when write fails before try block - Return unquoted text for JSON string primitives - Use AirflowOptionalProviderFeatureException for missing extras - Document DOCX paragraph-only extraction limitation - Rewrite XCom docs example to @task pattern for source_bytes - Update tests for all behavioral changes (30 tests pass)
…rules Rebases onto main to recover the 0.3.0 release entries that were rolled back on the original branch, and applies the review feedback the user- side review surfaced. Operator - Replace the temp-file dance for PDF/DOCX bytes with in-memory streams. ``pypdf.PdfReader`` and ``docx.Document`` both accept binary streams, so ``source_bytes`` now goes through ``io.BytesIO`` directly. No more ``NamedTemporaryFile(delete=False)`` + ``os.unlink``. - Add ``encoding`` and ``encoding_errors`` parameters for non-UTF-8 input (Windows-1252 CSVs, files with a leading byte-order mark, ...). Failed decodes raise a ``ValueError`` that includes the offending file path so directory-mode runs are diagnosable. - Add ``json_text_field``: when set, the named key on each JSON item becomes the embedding text and every other key lands in ``metadata``. When unset, JSON dicts are flattened to ``"k: v, k: v"`` (matches the CSV parser) instead of being dumped back to JSON syntax tokens. - Directory-mode ``source_path`` now silently ignores files whose name starts with ``.`` (``.DS_Store``, editor swap files, ``.gitkeep``) and skips unknown-extension files with a warning rather than crashing on the first stray file. - ``glob.glob(source_path, recursive=True)`` so ``**`` patterns walk subdirectories (the docs already advertised this). - Auto-extracted metadata (``file_name``, ``file_path``, ``row_index``, ``item_index``, ``page_number``) now takes precedence over ``metadata_fields`` with the same key (via ``setdefault``). - Expanded ``template_fields`` to include ``file_type``, ``file_extensions``, ``parser`` so they can be driven from Jinja. - Hoisted ``AirflowOptionalProviderFeatureException`` import to the module top so the lazy ``pypdf`` / ``docx`` blocks are 2 lines each. Docs - Switched all inline ``code-block:: python`` snippets to ``exampleinclude::`` directives pointing at a new ``example_document_loader.py`` (basic, directory, bytes, ``json_text_field`` patterns), matching the convention every other operator in this provider uses. - New sections documenting encoding handling, metadata precedence, and the directory-mode skip rules (files whose name starts with a ``.`` / unknown-extension warn-and-skip). Tests - Dropped the tautological ``test_template_fields`` that just round- tripped the class attribute; replaced with a behavioural check confirming the templated fields are actually in the templated set. - New coverage for: dot-prefixed-name skip, unknown-extension warn + skip, ``encoding`` / ``encoding_errors``, ``json_text_field``, JSON dict flatten, CSV empty-cell skip, ``metadata_fields`` precedence (auto wins), recursive ``**`` glob. - PDF/DOCX bytes tests assert the library was called with a ``BytesIO``, locking in the no-temp-file behaviour.
Addresses three follow-ups from the post-rewrite review (after #67120's initial refactor landed in 8f3aee4): 1. Cloud storage URIs via ObjectStoragePath - ``source_path`` now accepts any URI ObjectStoragePath resolves through fsspec (``s3://``, ``gs://``, ``azure://``, ``file://``). Falls back to the existing ``pathlib`` + ``glob`` code path for bare local paths so no existing behaviour changes. - New ``source_conn_id`` parameter to point at the Airflow connection that holds the cloud credentials (``aws_default``, ``google_cloud_default``, ...). Templated so it can be set per-DAG-run. - Parsers stay polymorphic over ``Path`` / ``ObjectStoragePath`` -- both expose ``read_bytes``, ``open``, ``name``, ``suffix`` so the existing read paths work unchanged. - Cross-directory globs in cloud URIs are explicitly not supported in this version; ``source_path`` accepts a single object or a directory. Documented. 2. Loader-not-chunker explicit - Operator docstring and new "No chunking" docs section make it clear the operator parses files into documents but never splits them. The right chunking strategy depends on the embedding model, so it stays in the downstream operator's hands (LlamaIndex EmbeddingOperator, LangChain text splitters, ...). 3. Format coverage roadmap - New docs section enumerates the formats deferred to follow-ups (.pptx, .epub, .xlsx, .html, image OCR, audio transcription), each behind its own optional extra, so reviewers and users see the scope choice explicitly rather than guessing what's missing. Tests - New ``TestCloudUriDispatch`` class covering: single-object URI returns one document, directory URI iterates children, neither-file-nor-dir URI raises with a clear error. ObjectStoragePath is mocked so the tests don't touch real cloud storage. Other ecosystems compared (LangChain BaseLoader + per-format classes; LlamaIndex BaseReader / SimpleDirectoryReader with fsspec; OpenAI / Anthropic / pydantic-ai don't have document-loader abstractions and delegate parsing to the model) -- this commit closes the remaining gap vs LlamaIndex on cloud storage and matches the LangChain naming / output-shape convention.
CI's MyPy providers job flagged the `mock.__str__ = lambda ...` and `mock.__str__.return_value = ...` patterns in TestCloudUriDispatch with ``[method-assign]`` -- mypy treats `__str__` as a real method that shouldn't be reassigned at the instance level, even on a MagicMock. The tests only assert on `file_name`, the dispatched call args, and text content; they never check `metadata.file_path` (which is what `str(path)` would feed). Removing the overrides keeps the assertions intact and lets mypy pass.
kaxil
approved these changes
May 20, 2026
This was referenced May 20, 2026
kaxil
added a commit
that referenced
this pull request
May 21, 2026
…s, cloud URIs Same playbook as #67192 (LangChain) and #67120 (DocumentLoader) plus three LlamaIndex-specific architectural fixes: Critical fixes - Stop mutating LlamaIndex's global ``Settings`` singleton. The previous ``LlamaIndexHook.configure_settings()`` wrote ``Settings.embed_model`` / ``Settings.llm`` process-wide, which leaks across concurrent tasks in the same worker. Replaced with per-call ``embed_model=`` / ``llm=`` parameters on ``VectorStoreIndex(...)`` and ``load_index_from_storage(...)``. - Own ``llamaindex`` connection type instead of squatting on ``pydanticai``. Mirrors the LangChain / CrewAI fix. - Remove ``documents`` from ``EmbeddingOperator.template_fields``. ``list[dict]`` doesn't survive Jinja stringification, and worse, a user document containing literal ``{{ var.value.api_key }}`` would leak secrets into the embedding store. Bind via ``loader.output`` instead. BYO embedding/LLM for non-OpenAI vendors - LlamaIndex doesn't ship an ``init_chat_model`` / ``init_embedding_model`` equivalent (verified in ``llama_index.core.embeddings.utils.resolve_embed_model`` -- only ``"default"`` / ``"local"`` / ``"clip:"`` dispatch). The hook therefore covers OpenAI (matching LlamaIndex's own ``resolve_embed_model("default")`` behaviour) and operators accept a pre-built ``BaseEmbedding`` / ``LLM`` instance to bypass the hook for Cohere / Bedrock / Vertex / HuggingFace / etc. Cloud-URI persistence - ``EmbeddingOperator.persist_dir`` and ``RetrievalOperator.index_persist_dir`` accept storage URIs (``s3://``, ``gs://``, ``azure://``) resolved via ``ObjectStoragePath`` and fsspec, matching the merged ``DocumentLoaderOperator`` pattern. Hook plumbing playbook (mirrors LangChain / CrewAI / DocumentLoader) - ``conn_type = "llamaindex"`` + new ``connection-types`` entry in ``provider.yaml`` with ``embed_model`` / ``llm_model`` conn-fields. - ``default_conn_name`` resolves at runtime via ``llm_conn_id: str | None = None``. - ``_resolve_model`` honours ``conn.extra_dejson`` for parity with the sibling hooks (swallows ``JSONDecodeError``, applies secret masking). - ``get_ui_field_behaviour`` added. - ``[llamaindex]`` extra in ``pyproject.toml`` pinning ``llama-index-core``, ``llama-index-embeddings-openai``, ``llama-index-llms-openai`` (enough to back the hook's default OpenAI return values). Same in the ``dev`` group. Misc operator/test fixes - Wrap lazy ``llama_index`` imports with ``AirflowOptionalProviderFeatureException`` so missing extras surface cleanly. - ``RetrievalOperator`` returns ``{"query": ..., "chunks": [...]}`` (was ``"question"``) and ``chunks[*].node_id`` (was the misleading ``"source"`` key). - ``RetrievalOperator`` raises ``FileNotFoundError`` with a "did you run EmbeddingOperator first?" hint when ``index_persist_dir`` is missing. - All three test files get an autouse fixture stubbing ``llama_index.*`` in ``sys.modules`` so ``@patch`` resolves without ``llama-index-*`` packages installed in CI's non-DB test env (mirrors #67237). - New ``example_llamaindex_hook.py`` with ``[START howto_*]`` markers for the docs to ``exampleinclude``.
kaxil
added a commit
that referenced
this pull request
May 21, 2026
* Add LlamaIndex operators to common.ai provider - Adds LlamaIndexHook to bridge Airflow connections to LlamaIndex's Settings singleton. Reuses the pydanticai connection type, supports separate embedding and LLM connections. - Adds EmbeddingOperator to chunk documents and produce embedding vectors via LlamaIndex's SentenceSplitter. Input is list[dict(text, metadata)] (same shape as DocumentLoaderOperator output), output includes chunks with vectors ready for downstream vector store ingest operators (pgvector, Pinecone, Weaviate). - Adds RetrievalOperator to load a persisted LlamaIndex index and perform similarity search. Output is scored chunks ready for synthesis via LLMOperator. Design notes All LlamaIndex imports are lazy (inside execute() / method bodies), so modules parse without llama-index installed. The hook currently hardcodes OpenAI embedding/LLM providers; a follow-up PR will refactor to use BaseAIHook for provider-agnostic model resolution when it lands. What's included ┌─────────────────────────────────────────┬──────────────────────────────────────────┐ │ File │ Purpose │ ├─────────────────────────────────────────┼──────────────────────────────────────────┤ │ hooks/llamaindex.py │ Hook (~110 lines) │ ├─────────────────────────────────────────┼──────────────────────────────────────────┤ │ operators/llamaindex_embedding.py │ EmbeddingOperator (~110 lines) │ ├─────────────────────────────────────────┼──────────────────────────────────────────┤ │ operators/llamaindex_retrieval.py │ RetrievalOperator (~90 lines) │ ├─────────────────────────────────────────┼──────────────────────────────────────────┤ │ tests/.../test_llamaindex.py │ 12 hook tests │ ├─────────────────────────────────────────┼──────────────────────────────────────────┤ │ tests/.../test_llamaindex_embedding.py │ 10 operator tests │ ├─────────────────────────────────────────┼──────────────────────────────────────────┤ │ tests/.../test_llamaindex_retrieval.py │ 8 operator tests │ ├─────────────────────────────────────────┼──────────────────────────────────────────┤ │ docs/hooks/llamaindex.rst │ Hook docs │ ├─────────────────────────────────────────┼──────────────────────────────────────────┤ │ docs/operators/llamaindex_embedding.rst │ EmbeddingOperator docs │ ├─────────────────────────────────────────┼──────────────────────────────────────────┤ │ docs/operators/llamaindex_retrieval.rst │ RetrievalOperator docs │ ├─────────────────────────────────────────┼──────────────────────────────────────────┤ │ provider.yaml │ Integration, hook, operator registration │ ├─────────────────────────────────────────┼──────────────────────────────────────────┤ │ docs/index.rst │ LlamaIndex Hook in Guides toctree │ ├─────────────────────────────────────────┼──────────────────────────────────────────┤ │ docs/operators/index.rst │ Chooser table rows │ └─────────────────────────────────────────┴──────────────────────────────────────────┘ Test plan - uv run --project providers/common/ai pytest providers/common/ai/tests/unit/common/ai/hooks/test_llamaindex.py -xvs (12 tests) - uv run --project providers/common/ai pytest providers/common/ai/tests/unit/common/ai/operators/test_llamaindex_embedding.py providers/common/ai/tests/unit/common/ai/operators/test_llamaindex_retrieval.py -xvs (18 tests) - Hook: init defaults, separate embed_conn_id, connection kwargs extraction, embedding model, LLM, Settings configuration - EmbeddingOperator: output shape, chunking, index persistence, vector inclusion/omission, splitter params - RetrievalOperator: output shape, chunk keys, top_k forwarding, multiple results, storage context --- Was generative AI tooling used to co-author this PR? - Yes — Claude Code (Opus 4.6) Generated-by: Claude Code (Opus 4.6) following https://github.com/apache/airflow/blob/main/contributing-docs/05_pull_requests.rst#gen-ai-assisted-contributions * Refactor LlamaIndex hook + operators: no Settings mutation, BYO models, cloud URIs Same playbook as #67192 (LangChain) and #67120 (DocumentLoader) plus three LlamaIndex-specific architectural fixes: Critical fixes - Stop mutating LlamaIndex's global ``Settings`` singleton. The previous ``LlamaIndexHook.configure_settings()`` wrote ``Settings.embed_model`` / ``Settings.llm`` process-wide, which leaks across concurrent tasks in the same worker. Replaced with per-call ``embed_model=`` / ``llm=`` parameters on ``VectorStoreIndex(...)`` and ``load_index_from_storage(...)``. - Own ``llamaindex`` connection type instead of squatting on ``pydanticai``. Mirrors the LangChain / CrewAI fix. - Remove ``documents`` from ``EmbeddingOperator.template_fields``. ``list[dict]`` doesn't survive Jinja stringification, and worse, a user document containing literal ``{{ var.value.api_key }}`` would leak secrets into the embedding store. Bind via ``loader.output`` instead. BYO embedding/LLM for non-OpenAI vendors - LlamaIndex doesn't ship an ``init_chat_model`` / ``init_embedding_model`` equivalent (verified in ``llama_index.core.embeddings.utils.resolve_embed_model`` -- only ``"default"`` / ``"local"`` / ``"clip:"`` dispatch). The hook therefore covers OpenAI (matching LlamaIndex's own ``resolve_embed_model("default")`` behaviour) and operators accept a pre-built ``BaseEmbedding`` / ``LLM`` instance to bypass the hook for Cohere / Bedrock / Vertex / HuggingFace / etc. Cloud-URI persistence - ``EmbeddingOperator.persist_dir`` and ``RetrievalOperator.index_persist_dir`` accept storage URIs (``s3://``, ``gs://``, ``azure://``) resolved via ``ObjectStoragePath`` and fsspec, matching the merged ``DocumentLoaderOperator`` pattern. Hook plumbing playbook (mirrors LangChain / CrewAI / DocumentLoader) - ``conn_type = "llamaindex"`` + new ``connection-types`` entry in ``provider.yaml`` with ``embed_model`` / ``llm_model`` conn-fields. - ``default_conn_name`` resolves at runtime via ``llm_conn_id: str | None = None``. - ``_resolve_model`` honours ``conn.extra_dejson`` for parity with the sibling hooks (swallows ``JSONDecodeError``, applies secret masking). - ``get_ui_field_behaviour`` added. - ``[llamaindex]`` extra in ``pyproject.toml`` pinning ``llama-index-core``, ``llama-index-embeddings-openai``, ``llama-index-llms-openai`` (enough to back the hook's default OpenAI return values). Same in the ``dev`` group. Misc operator/test fixes - Wrap lazy ``llama_index`` imports with ``AirflowOptionalProviderFeatureException`` so missing extras surface cleanly. - ``RetrievalOperator`` returns ``{"query": ..., "chunks": [...]}`` (was ``"question"``) and ``chunks[*].node_id`` (was the misleading ``"source"`` key). - ``RetrievalOperator`` raises ``FileNotFoundError`` with a "did you run EmbeddingOperator first?" hint when ``index_persist_dir`` is missing. - All three test files get an autouse fixture stubbing ``llama_index.*`` in ``sys.modules`` so ``@patch`` resolves without ``llama-index-*`` packages installed in CI's non-DB test env (mirrors #67237). - New ``example_llamaindex_hook.py`` with ``[START howto_*]`` markers for the docs to ``exampleinclude``. * Rename LlamaIndex operators with framework prefix; fold in #67189 RAG examples Per Kaxil's review r3267387604: ``RetrievalOperator`` / ``EmbeddingOperator`` are too generic in the common.ai namespace -- they risk colliding when other frameworks add their own embedding/retrieval operators. Renamed both with the LlamaIndex prefix: - ``EmbeddingOperator`` -> ``LlamaIndexEmbeddingOperator`` - ``RetrievalOperator`` -> ``LlamaIndexRetrievalOperator`` Renames applied across the two operator modules, three docs RSTs, the two test files, both example DAGs, and the cross-refs in ``docs/operators/index.rst``, ``docs/hooks/llamaindex.rst``, ``docs/operators/document_loader.rst``, and ``docs/hooks/index.rst``. Folds in #67189 (``example_llamaindex_rag.py``) which would otherwise sit blocked waiting for this PR to merge. Rewritten for the new API: - Uses the renamed classes - Drops ``documents="{{ ti.xcom_pull(...) }}"`` Jinja templating (template_fields removed; bind via ``loader.output`` direct) - Switches LlamaIndex operators to ``llamaindex_default`` conn (was ``pydanticai_default``); the synthesis-step ``LLMOperator`` keeps ``pydanticai_default`` because it's pydantic-ai-backed (different framework, intentional split documented in the module docstring) - Adds explicit ``embed_model="text-embedding-3-small"`` to every embedding/retrieval call (new operator validation requires it) - Fixes the string-reference task chains (``load >> "build_index"`` -> ``load >> build_index``) which weren't valid task dependencies Closes #67189. * Address code-review findings on LlamaIndex operators - Fix ObjectStoragePath conn_id mangling: pass raw URI to LlamaIndex persist_dir= and supply target.fs separately. str(target) returns s3://<conn_id>@<bucket>/..., which fsspec misinterprets. - Add documents / embed_model / embed_conn_id to template_fields so XComArg resolution fires. The previous "list[dict] doesn't survive stringification" rationale was wrong; Templater unwraps resolvables before Jinja. - Default llm_conn_id to None on both operators; LlamaIndexHook resolves to default_conn_name at runtime. Hard-coding "llamaindex_default" undid the hook's careful runtime resolution. - Add embed_conn_id pass-through for separate embedding credentials. - Replace isinstance(str) duck-typing with hasattr-based BaseEmbedding check; raise TypeError with a clear pointer instead of letting an unresolved XComArg or random object explode later. - Hoist 'import os' and 'from pathlib import Path' to module top. - Pad RST title underlines and refresh docs/tests to match the new surface. * Fix mypy on LlamaIndex embedding operator - Pass persist_dir as a typed str arg to _persist so the existing None-narrowing # type: ignore comments can go away. - Cast SentenceSplitter nodes to list[TextNode] for the .text access: the splitter only ever returns TextNode, but the base get_nodes_from_documents signature is typed as list[BaseNode]. * Install llama-index in tests instead of stubbing sys.modules llama-index-core / -embeddings-openai / -llms-openai were declared in the common.ai provider's dev dependency group but missing from uv.lock, so CI never actually installed them. The tests papered over that by faking out llama_index.* in sys.modules with MagicMocks. Refresh uv.lock so the packages get installed, then drop the sys.modules manipulation: - test_llamaindex.py: remove the autouse _stub_llama_index_modules fixture entirely; @patch resolves against the real modules. - test_llamaindex_embedding.py / test_llamaindex_retrieval.py: replace the _stub_li fixture (sys.modules setitem) with a smaller _li fixture that uses monkeypatch.setattr against real llama_index.core symbols. * Apply ruff lint/format fixes --------- Co-authored-by: Kaxil Naik <kaxilnaik@gmail.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Motivation
File parsing is the highest-volume gap in Airflow's AI story
Every RAG pipeline on Airflow currently requires custom parsing code. This operator makes it a single line in a Dag.
What's included
┌────────────────────────────────────┬───────────────────────────────────────────┐
│ File │ Purpose │
├────────────────────────────────────┼───────────────────────────────────────────┤
│ operators/document_loader.py │ Operator (~270 lines) │
├────────────────────────────────────┼───────────────────────────────────────────┤
│ tests/.../test_document_loader.py │ 26 unit tests │
├────────────────────────────────────┼───────────────────────────────────────────┤
│ docs/operators/document_loader.rst │ Usage docs │
├────────────────────────────────────┼───────────────────────────────────────────┤
│ provider.yaml │ Operator registration + how-to-guide link │
├────────────────────────────────────┼───────────────────────────────────────────┤
│ pyproject.toml │ [pdf] and [docx] optional dependencies │
├────────────────────────────────────┼───────────────────────────────────────────┤
│ docs/operators/index.rst │ Chooser table row │
└────────────────────────────────────┴───────────────────────────────────────────┘
Test plan
Was generative AI tooling used to co-author this PR?
Generated-by: [Claude] following the guidelines
{pr_number}.significant.rst, in airflow-core/newsfragments. You can add this file in a follow-up commit after the PR is created so you know the PR number.