From f986c79ae0c72e2278aa15e6d4ed7854f77fcb22 Mon Sep 17 00:00:00 2001 From: Vikram Koka Date: Mon, 18 May 2026 15:48:14 +0100 Subject: [PATCH 1/5] Add DocumentLoaderOperator to common.ai provider MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Adds DocumentLoaderOperator, a framework-agnostic file parser that bridges Airflow's connectivity layer (hooks returning bytes/files) and the AI embedding layer (operators needing list[dict(text, metadata)]). No LlamaIndex, LangChain, or other AI framework dependency. - Built-in parsers for .txt, .md, .csv, .json with zero extra deps. PDF (via pypdf, BSD) and DOCX (via python-docx, MIT) available as optional extras: pip install apache-airflow-providers-common-ai[pdf] / [docx]. - Supports two input modes: source_path (local file, directory, or glob pattern) and source_bytes (raw bytes from XCom). Output is list[dict(text, metadata)], the same shape consumed by downstream embedding operators. Motivation File parsing is the highest-volume gap in Airflow's AI story Every RAG pipeline on Airflow currently requires custom parsing code. This operator makes it a single line in a Dag. What's included ┌────────────────────────────────────┬───────────────────────────────────────────┐ │ File │ Purpose │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ operators/document_loader.py │ Operator (~270 lines) │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ tests/.../test_document_loader.py │ 26 unit tests │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ docs/operators/document_loader.rst │ Usage docs │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ provider.yaml │ Operator registration + how-to-guide link │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ pyproject.toml │ [pdf] and [docx] optional dependencies │ ├────────────────────────────────────┼───────────────────────────────────────────┤ │ docs/operators/index.rst │ Chooser table row │ └────────────────────────────────────┴───────────────────────────────────────────┘ Test plan - uv run --project providers/common/ai pytest providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py -xvs (26 tests) - Built-in parsers: txt, md, csv (one doc per row), json (single object and array) - PDF/DOCX parsers: mocked via sys.modules injection (packages not installed in test env) - ImportError guidance when optional packages are missing - Init validation: mutual exclusion of source_path/source_bytes, file_type required with source_bytes - File discovery: glob patterns, extension filtering, empty directories - Output shape: every item has text and metadata, file_name/file_path in metadata, custom metadata_fields merged --- .../ai/docs/operators/document_loader.rst | 194 +++++++++++ providers/common/ai/docs/operators/index.rst | 3 + providers/common/ai/provider.yaml | 2 + providers/common/ai/pyproject.toml | 2 + .../common/ai/operators/document_loader.py | 270 +++++++++++++++ .../ai/operators/test_document_loader.py | 319 ++++++++++++++++++ 6 files changed, 790 insertions(+) create mode 100644 providers/common/ai/docs/operators/document_loader.rst create mode 100644 providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py create mode 100644 providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py diff --git a/providers/common/ai/docs/operators/document_loader.rst b/providers/common/ai/docs/operators/document_loader.rst new file mode 100644 index 0000000000000..39f4d9d501cf1 --- /dev/null +++ b/providers/common/ai/docs/operators/document_loader.rst @@ -0,0 +1,194 @@ + .. Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + .. http://www.apache.org/licenses/LICENSE-2.0 + + .. Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +.. _howto/operator:document_loader: + +``DocumentLoaderOperator`` +========================== + +Use :class:`~airflow.providers.common.ai.operators.document_loader.DocumentLoaderOperator` +to parse files into ``list[dict(text, metadata)]`` for downstream embedding +pipelines. The operator bridges Airflow's connectivity layer (hooks that +produce bytes or local files) and the AI embedding layer (operators that +need structured text with metadata). + +The operator is **framework-agnostic** — it has no dependency on LlamaIndex, +LangChain, or any other AI framework. + +Built-in formats +---------------- + +``.txt``, ``.md``, ``.csv``, and ``.json`` are handled with zero extra +dependencies: + +.. code-block:: python + + from airflow.providers.common.ai.operators.document_loader import DocumentLoaderOperator + + load_docs = DocumentLoaderOperator( + task_id="load_docs", + source_path="/opt/airflow/data/articles/", + ) + +CSV files produce one document per row. JSON files with a top-level array +produce one document per element; a single JSON object produces one document. + +PDF parsing +----------- + +Install the ``pdf`` extra to parse PDF files via `pypdf `__: + +.. code-block:: bash + + pip install apache-airflow-providers-common-ai[pdf] + +.. code-block:: python + + load_pdfs = DocumentLoaderOperator( + task_id="load_pdfs", + source_path="/opt/airflow/data/reports/*.pdf", + ) + +Each page with extractable text becomes a separate document. Empty pages are +skipped. The ``page_number`` is included in the document metadata. + +DOCX parsing +------------ + +Install the ``docx`` extra to parse Word documents via +`python-docx `__: + +.. code-block:: bash + + pip install apache-airflow-providers-common-ai[docx] + +.. code-block:: python + + load_word = DocumentLoaderOperator( + task_id="load_word", + source_path="/opt/airflow/data/specs/*.docx", + ) + +All non-empty paragraphs are concatenated into a single document per file. + +Glob patterns and filtering +---------------------------- + +Pass a glob pattern to ``source_path`` to match multiple files. Use +``file_extensions`` to limit which files are processed: + +.. code-block:: python + + load_filtered = DocumentLoaderOperator( + task_id="load_filtered", + source_path="/opt/airflow/data/mixed/*", + file_extensions=[".pdf", ".txt"], + ) + +Composing with downstream operators +------------------------------------ + +The output format (``list[dict(text, metadata)]``) is designed to feed +directly into embedding operators. For example, with the LlamaIndex +``EmbeddingOperator``: + +.. code-block:: python + + load = DocumentLoaderOperator( + task_id="load", + source_path="/data/docs/*.pdf", + ) + + embed = EmbeddingOperator( + task_id="embed", + documents="{{ ti.xcom_pull(task_ids='load') }}", + llm_conn_id="openai_default", + ) + + load >> embed + +Composing with Airflow providers +--------------------------------- + +Use any Airflow provider to download files, then parse them with +``DocumentLoaderOperator``: + +.. code-block:: python + + from airflow.providers.amazon.aws.transfers.s3_to_local import S3ToLocalFilesystemOperator + + download = S3ToLocalFilesystemOperator( + task_id="download", + bucket_name="my-bucket", + key="documents/report.pdf", + local_path="/tmp/report.pdf", + ) + + load = DocumentLoaderOperator( + task_id="load", + source_path="/tmp/report.pdf", + ) + + download >> load + +For **structured API data** (Salesforce SOQL results, database query exports), +a ``@task`` that maps fields to text and metadata is more appropriate than +``DocumentLoaderOperator``, which is designed for binary file parsing: + +.. code-block:: python + + @task + def transform_cases(records: list[dict]) -> list[dict]: + return [ + { + "text": f"{r['Subject']}\n\n{r['Description']}", + "metadata": {"case_id": r["Id"], "source": "salesforce"}, + } + for r in records + ] + +Loading from bytes +------------------ + +When upstream tasks pass file content via XCom, use ``source_bytes`` with +an explicit ``file_type``: + +.. code-block:: python + + load = DocumentLoaderOperator( + task_id="load", + source_bytes="{{ ti.xcom_pull(task_ids='fetch_file') }}", + file_type=".pdf", + ) + +Parameters +---------- + +- ``source_path``: Local file path, directory, or glob pattern. + Mutually exclusive with ``source_bytes``. +- ``source_bytes``: Raw file bytes from XCom. Requires ``file_type``. + Mutually exclusive with ``source_path``. +- ``file_type``: File extension hint (e.g. ``".pdf"``). Required with + ``source_bytes``. Optional with ``source_path`` to override + auto-detection. +- ``parser``: Parsing backend. ``"auto"`` (default) selects from the file + extension. Set explicitly to force a specific backend (e.g. ``"text"`` + to treat an unknown extension as plain text). +- ``file_extensions``: Filter which files to process when ``source_path`` + matches multiple files. +- ``metadata_fields``: Extra key-value pairs merged into every document's + metadata dict. diff --git a/providers/common/ai/docs/operators/index.rst b/providers/common/ai/docs/operators/index.rst index 89ba5d15e6c20..967a43de281f4 100644 --- a/providers/common/ai/docs/operators/index.rst +++ b/providers/common/ai/docs/operators/index.rst @@ -46,6 +46,9 @@ to pick the one that fits your use case: * - Multi-turn reasoning with tools (DB queries, API calls, etc.) - :class:`~airflow.providers.common.ai.operators.agent.AgentOperator` - ``@task.agent`` + * - Parse files (PDF, DOCX, CSV, etc.) into document dicts for embedding + - :class:`~airflow.providers.common.ai.operators.document_loader.DocumentLoaderOperator` + - *(no decorator)* **LLMOperator / @task.llm** — stateless, single-turn calls. Use this for classification, summarization, extraction, or any prompt that produces one response. Supports structured output diff --git a/providers/common/ai/provider.yaml b/providers/common/ai/provider.yaml index e56dcce6cd6db..92d826ffb0a89 100644 --- a/providers/common/ai/provider.yaml +++ b/providers/common/ai/provider.yaml @@ -42,6 +42,7 @@ integrations: - /docs/apache-airflow-providers-common-ai/operators/llm_branch.rst - /docs/apache-airflow-providers-common-ai/operators/llm_sql.rst - /docs/apache-airflow-providers-common-ai/operators/llm_schema_compare.rst + - /docs/apache-airflow-providers-common-ai/operators/document_loader.rst tags: [ai] - integration-name: Pydantic AI external-doc-url: https://ai.pydantic.dev/ @@ -363,6 +364,7 @@ operators: - airflow.providers.common.ai.operators.llm_branch - airflow.providers.common.ai.operators.llm_sql - airflow.providers.common.ai.operators.llm_schema_compare + - airflow.providers.common.ai.operators.document_loader task-decorators: - class-name: airflow.providers.common.ai.decorators.agent.agent_task diff --git a/providers/common/ai/pyproject.toml b/providers/common/ai/pyproject.toml index da9fe3236252c..18833cd64bc1a 100644 --- a/providers/common/ai/pyproject.toml +++ b/providers/common/ai/pyproject.toml @@ -98,6 +98,8 @@ dependencies = [ "langchain" = [ "langchain>=1.0.0", ] +"pdf" = ["pypdf>=4.0.0"] +"docx" = ["python-docx>=1.0.0"] [dependency-groups] dev = [ diff --git a/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py b/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py new file mode 100644 index 0000000000000..e131d61617c30 --- /dev/null +++ b/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py @@ -0,0 +1,270 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Operator for parsing files into document dicts suitable for embedding.""" + +from __future__ import annotations + +import csv +import glob +import io +import json +import os +import tempfile +from collections.abc import Sequence +from pathlib import Path +from typing import TYPE_CHECKING, Any + +from airflow.providers.common.compat.sdk import BaseOperator + +if TYPE_CHECKING: + from airflow.sdk import Context + + +class DocumentLoaderOperator(BaseOperator): + """ + Parse files into ``list[dict(text, metadata)]`` for downstream embedding. + + Bridges Airflow's connectivity layer (hooks that produce bytes or local + files) and the AI embedding layer (operators that need structured text + with metadata). Framework-agnostic: no LlamaIndex, LangChain, or other + AI framework dependency. + + Built-in parsers handle ``.txt``, ``.md``, ``.csv``, and ``.json`` with + zero extra dependencies. PDF and DOCX support require optional packages + installable via extras:: + + pip install apache-airflow-providers-common-ai[pdf] # pypdf + pip install apache-airflow-providers-common-ai[docx] # python-docx + + Provide exactly one of ``source_path`` or ``source_bytes``. When using + ``source_bytes``, ``file_type`` is required so the operator knows which + parser to use. + + :param source_path: Local file path or glob pattern (e.g. ``/data/*.pdf``). + :param source_bytes: Raw file bytes, typically from XCom. + :param file_type: File extension hint when using ``source_bytes`` + (e.g. ``".pdf"``). Also accepted with ``source_path`` to override + auto-detection. + :param parser: Parsing backend selection. ``"auto"`` (default) picks the + backend from the file extension. + :param file_extensions: When ``source_path`` is a directory or glob, + only process files whose extension is in this list. + :param metadata_fields: Extra key-value pairs merged into every + document's ``metadata`` dict. + """ + + template_fields: Sequence[str] = ( + "source_path", + "source_bytes", + "metadata_fields", + ) + + EXTENSION_BACKEND_MAP: dict[str, str] = { + ".txt": "text", + ".md": "text", + ".csv": "csv", + ".json": "json", + ".pdf": "pypdf", + ".docx": "python-docx", + } + + def __init__( + self, + *, + source_path: str | None = None, + source_bytes: bytes | None = None, + file_type: str | None = None, + parser: str = "auto", + file_extensions: list[str] | None = None, + metadata_fields: dict[str, Any] | None = None, + **kwargs: Any, + ) -> None: + super().__init__(**kwargs) + if source_path and source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes', not both.") + if not source_path and not source_bytes: + raise ValueError("Provide exactly one of 'source_path' or 'source_bytes'.") + if source_bytes and not file_type: + raise ValueError("'file_type' is required when using 'source_bytes' (e.g. '.pdf').") + + self.source_path = source_path + self.source_bytes = source_bytes + self.file_type = file_type + self.parser = parser + self.file_extensions = file_extensions + self.metadata_fields = metadata_fields + + def execute(self, context: Context) -> list[dict[str, Any]]: + if self.source_bytes: + documents = self._parse_bytes(self.source_bytes, self.file_type) + file_count = 1 + else: + files = self._resolve_files(self.source_path) + file_count = len(files) + documents = [] + for file_path in files: + ext = self.file_type or file_path.suffix.lower() + parsed = self._parse_file(file_path, ext) + for doc in parsed: + doc["metadata"]["file_name"] = file_path.name + doc["metadata"]["file_path"] = str(file_path) + documents.extend(parsed) + + if self.metadata_fields: + for doc in documents: + doc["metadata"].update(self.metadata_fields) + + self.log.info("Parsed %d documents from %d files", len(documents), file_count) + return documents + + def _resolve_files(self, source_path: str) -> list[Path]: + path = Path(source_path) + if path.is_file(): + return [path] + + if path.is_dir(): + candidates = sorted(path.iterdir()) + else: + candidates = [Path(p) for p in sorted(glob.glob(source_path))] + + results = [p for p in candidates if p.is_file()] + + if self.file_extensions: + allowed = {ext if ext.startswith(".") else f".{ext}" for ext in self.file_extensions} + results = [p for p in results if p.suffix.lower() in allowed] + + return results + + def _parse_bytes(self, raw: bytes, file_type: str) -> list[dict[str, Any]]: + ext = file_type if file_type.startswith(".") else f".{file_type}" + backend = self._resolve_backend(ext) + + if backend in ("pypdf", "python-docx"): + with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: + tmp.write(raw) + tmp_path = Path(tmp.name) + try: + return self._parse_file(tmp_path, ext) + finally: + os.unlink(tmp_path) + + text = raw.decode("utf-8") + if backend == "csv": + return self._parse_csv_text(text) + if backend == "json": + return self._parse_json_text(text) + return [{"text": text, "metadata": {}}] + + def _parse_file(self, file_path: Path, ext: str) -> list[dict[str, Any]]: + backend = self._resolve_backend(ext) + + if backend == "text": + return self._parse_text(file_path) + if backend == "csv": + return self._parse_csv(file_path) + if backend == "json": + return self._parse_json(file_path) + if backend == "pypdf": + return self._parse_pdf(file_path) + if backend == "python-docx": + return self._parse_docx(file_path) + + raise ValueError(f"No parser found for backend '{backend}'.") + + def _resolve_backend(self, ext: str) -> str: + if self.parser != "auto": + return self.parser + + ext = ext.lower() + if ext not in self.EXTENSION_BACKEND_MAP: + supported = ", ".join(sorted(self.EXTENSION_BACKEND_MAP.keys())) + raise ValueError( + f"No parser registered for extension '{ext}'. " + f"Supported extensions: {supported}. " + f"Set 'parser' explicitly to override auto-detection." + ) + return self.EXTENSION_BACKEND_MAP[ext] + + def _parse_text(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return [{"text": text, "metadata": {}}] + + def _parse_csv(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return self._parse_csv_text(text) + + def _parse_csv_text(self, text: str) -> list[dict[str, Any]]: + reader = csv.DictReader(io.StringIO(text)) + documents = [] + for row_idx, row in enumerate(reader): + row_text = ", ".join(f"{k}: {v}" for k, v in row.items() if v) + documents.append( + { + "text": row_text, + "metadata": {"row_index": row_idx}, + } + ) + return documents + + def _parse_json(self, file_path: Path) -> list[dict[str, Any]]: + text = file_path.read_text(encoding="utf-8") + return self._parse_json_text(text) + + def _parse_json_text(self, text: str) -> list[dict[str, Any]]: + data = json.loads(text) + if isinstance(data, list): + return [ + {"text": json.dumps(item, ensure_ascii=False), "metadata": {"item_index": idx}} + for idx, item in enumerate(data) + ] + return [{"text": json.dumps(data, ensure_ascii=False), "metadata": {}}] + + def _parse_pdf(self, file_path: Path) -> list[dict[str, Any]]: + try: + from pypdf import PdfReader + except ImportError: + raise ImportError( + "pypdf is required for PDF parsing. " + "Install it with: pip install apache-airflow-providers-common-ai[pdf]" + ) + + reader = PdfReader(str(file_path)) + documents = [] + for page_num, page in enumerate(reader.pages): + text = page.extract_text() or "" + if text.strip(): + documents.append( + { + "text": text, + "metadata": {"page_number": page_num + 1}, + } + ) + return documents + + def _parse_docx(self, file_path: Path) -> list[dict[str, Any]]: + try: + from docx import Document + except ImportError: + raise ImportError( + "python-docx is required for DOCX parsing. " + "Install it with: pip install apache-airflow-providers-common-ai[docx]" + ) + + doc = Document(str(file_path)) + paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] + text = "\n\n".join(paragraphs) + return [{"text": text, "metadata": {}}] diff --git a/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py b/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py new file mode 100644 index 0000000000000..14cf92ba14dbe --- /dev/null +++ b/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py @@ -0,0 +1,319 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +from __future__ import annotations + +import json +from unittest.mock import MagicMock, patch + +import pytest + +from airflow.providers.common.ai.operators.document_loader import DocumentLoaderOperator + + +class TestDocumentLoaderInit: + def test_template_fields(self): + expected = {"source_path", "source_bytes", "metadata_fields"} + assert set(DocumentLoaderOperator.template_fields) == expected + + def test_both_sources_raises(self): + with pytest.raises(ValueError, match="not both"): + DocumentLoaderOperator(task_id="test", source_path="/tmp/file.txt", source_bytes=b"hello") + + def test_neither_source_raises(self): + with pytest.raises(ValueError, match="Provide exactly one"): + DocumentLoaderOperator(task_id="test") + + def test_source_bytes_without_file_type_raises(self): + with pytest.raises(ValueError, match="file_type"): + DocumentLoaderOperator(task_id="test", source_bytes=b"hello") + + +class TestTextParser: + def test_txt_file(self, tmp_path): + f = tmp_path / "doc.txt" + f.write_text("Hello world", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert result[0]["text"] == "Hello world" + assert result[0]["metadata"]["file_name"] == "doc.txt" + + def test_md_file(self, tmp_path): + f = tmp_path / "readme.md" + f.write_text("# Title\n\nSome content", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert "# Title" in result[0]["text"] + + +class TestCsvParser: + def test_csv_one_doc_per_row(self, tmp_path): + f = tmp_path / "data.csv" + f.write_text("name,age\nAlice,30\nBob,25\n", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 2 + assert "Alice" in result[0]["text"] + assert "Bob" in result[1]["text"] + assert result[0]["metadata"]["row_index"] == 0 + assert result[1]["metadata"]["row_index"] == 1 + + def test_csv_from_bytes(self): + raw = b"col1,col2\nval1,val2\n" + op = DocumentLoaderOperator(task_id="test", source_bytes=raw, file_type=".csv") + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert "val1" in result[0]["text"] + + +class TestJsonParser: + def test_json_array(self, tmp_path): + f = tmp_path / "items.json" + data = [{"title": "First"}, {"title": "Second"}] + f.write_text(json.dumps(data), encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 2 + assert result[0]["metadata"]["item_index"] == 0 + + def test_json_single_object(self, tmp_path): + f = tmp_path / "config.json" + f.write_text('{"key": "value"}', encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert "key" in result[0]["text"] + + def test_json_from_bytes(self): + raw = b'[{"a": 1}, {"b": 2}]' + op = DocumentLoaderOperator(task_id="test", source_bytes=raw, file_type=".json") + result = op.execute(context=MagicMock()) + + assert len(result) == 2 + + +def _make_mock_pypdf_module(mock_reader): + """Create a fake pypdf module with a PdfReader that returns mock_reader.""" + mock_module = MagicMock() + mock_module.PdfReader = MagicMock(return_value=mock_reader) + return mock_module + + +def _make_mock_docx_module(mock_doc): + """Create a fake docx module with a Document that returns mock_doc.""" + mock_module = MagicMock() + mock_module.Document = MagicMock(return_value=mock_doc) + return mock_module + + +class TestPdfParser: + def test_pdf_parsing(self, tmp_path): + mock_page_1 = MagicMock() + mock_page_1.extract_text.return_value = "Page one content" + mock_page_2 = MagicMock() + mock_page_2.extract_text.return_value = "Page two content" + + mock_reader = MagicMock() + mock_reader.pages = [mock_page_1, mock_page_2] + + f = tmp_path / "report.pdf" + f.write_bytes(b"fake pdf bytes") + + mock_pypdf = _make_mock_pypdf_module(mock_reader) + with patch.dict("sys.modules", {"pypdf": mock_pypdf}): + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 2 + assert result[0]["text"] == "Page one content" + assert result[0]["metadata"]["page_number"] == 1 + assert result[1]["metadata"]["page_number"] == 2 + + def test_pdf_skips_empty_pages(self, tmp_path): + mock_page = MagicMock() + mock_page.extract_text.return_value = " " + mock_reader = MagicMock() + mock_reader.pages = [mock_page] + + f = tmp_path / "empty.pdf" + f.write_bytes(b"fake pdf") + + mock_pypdf = _make_mock_pypdf_module(mock_reader) + with patch.dict("sys.modules", {"pypdf": mock_pypdf}): + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 0 + + def test_pdf_missing_raises_importerror(self, tmp_path): + f = tmp_path / "doc.pdf" + f.write_bytes(b"fake pdf") + + with patch.dict("sys.modules", {"pypdf": None}): + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + with pytest.raises(ImportError, match="apache-airflow-providers-common-ai\\[pdf\\]"): + op.execute(context=MagicMock()) + + +class TestDocxParser: + def test_docx_parsing(self, tmp_path): + mock_para_1 = MagicMock() + mock_para_1.text = "First paragraph" + mock_para_2 = MagicMock() + mock_para_2.text = "Second paragraph" + mock_para_empty = MagicMock() + mock_para_empty.text = " " + + mock_doc_obj = MagicMock() + mock_doc_obj.paragraphs = [mock_para_1, mock_para_empty, mock_para_2] + + f = tmp_path / "doc.docx" + f.write_bytes(b"fake docx") + + mock_docx = _make_mock_docx_module(mock_doc_obj) + with patch.dict("sys.modules", {"docx": mock_docx}): + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert "First paragraph" in result[0]["text"] + assert "Second paragraph" in result[0]["text"] + + def test_docx_missing_raises_importerror(self, tmp_path): + f = tmp_path / "doc.docx" + f.write_bytes(b"fake docx") + + with patch.dict("sys.modules", {"docx": None}): + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + with pytest.raises(ImportError, match="apache-airflow-providers-common-ai\\[docx\\]"): + op.execute(context=MagicMock()) + + +class TestFileDiscovery: + def test_glob_multiple_files(self, tmp_path): + (tmp_path / "a.txt").write_text("file a", encoding="utf-8") + (tmp_path / "b.txt").write_text("file b", encoding="utf-8") + (tmp_path / "c.md").write_text("file c", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path / "*.txt")) + result = op.execute(context=MagicMock()) + + assert len(result) == 2 + texts = {doc["text"] for doc in result} + assert texts == {"file a", "file b"} + + def test_directory_source(self, tmp_path): + (tmp_path / "x.txt").write_text("hello", encoding="utf-8") + (tmp_path / "y.md").write_text("world", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path)) + result = op.execute(context=MagicMock()) + + assert len(result) == 2 + + def test_file_extensions_filter(self, tmp_path): + (tmp_path / "keep.txt").write_text("keep me", encoding="utf-8") + (tmp_path / "skip.md").write_text("skip me", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path), file_extensions=[".txt"]) + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert result[0]["text"] == "keep me" + + def test_empty_directory_returns_empty(self, tmp_path): + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path)) + result = op.execute(context=MagicMock()) + + assert result == [] + + def test_unknown_extension_raises(self, tmp_path): + f = tmp_path / "data.xyz" + f.write_text("some data", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + with pytest.raises(ValueError, match="No parser registered"): + op.execute(context=MagicMock()) + + +class TestOutputShape: + def test_every_item_has_text_and_metadata(self, tmp_path): + (tmp_path / "a.txt").write_text("doc a", encoding="utf-8") + (tmp_path / "b.txt").write_text("doc b", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path / "*.txt")) + result = op.execute(context=MagicMock()) + + for doc in result: + assert "text" in doc + assert "metadata" in doc + assert isinstance(doc["text"], str) + assert isinstance(doc["metadata"], dict) + + def test_metadata_fields_appended(self, tmp_path): + f = tmp_path / "doc.txt" + f.write_text("content", encoding="utf-8") + + op = DocumentLoaderOperator( + task_id="test", + source_path=str(f), + metadata_fields={"source": "test_suite", "version": 2}, + ) + result = op.execute(context=MagicMock()) + + assert result[0]["metadata"]["source"] == "test_suite" + assert result[0]["metadata"]["version"] == 2 + + def test_file_metadata_included(self, tmp_path): + f = tmp_path / "report.txt" + f.write_text("content", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert result[0]["metadata"]["file_name"] == "report.txt" + assert "file_path" in result[0]["metadata"] + + def test_source_bytes_no_file_metadata(self): + op = DocumentLoaderOperator(task_id="test", source_bytes=b"hello", file_type=".txt") + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert result[0]["text"] == "hello" + assert "file_name" not in result[0]["metadata"] + + def test_explicit_parser_override(self, tmp_path): + f = tmp_path / "data.log" + f.write_text("log line", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f), parser="text") + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert result[0]["text"] == "log line" From ed80b298c29d784893264a7419840b5abac08d17 Mon Sep 17 00:00:00 2001 From: Vikram Koka Date: Tue, 19 May 2026 14:46:48 +0100 Subject: [PATCH 2/5] Addressed PR feedback Addressed Kaxil's feedback on PR. thank you Kaxil - Remove source_bytes from template_fields (Jinja breaks bytes) - Use `is not None` validation instead of truthiness checks - Raise FileNotFoundError when no files match source_path - Normalize file_extensions filter to case-insensitive - Fix temp file leak when write fails before try block - Return unquoted text for JSON string primitives - Use AirflowOptionalProviderFeatureException for missing extras - Document DOCX paragraph-only extraction limitation - Rewrite XCom docs example to @task pattern for source_bytes - Update tests for all behavioral changes (30 tests pass) --- .../ai/docs/operators/document_loader.rst | 28 +++++++--- providers/common/ai/docs/operators/index.rst | 6 ++- .../common/ai/operators/document_loader.py | 45 +++++++++------- .../ai/operators/test_document_loader.py | 52 +++++++++++++++---- 4 files changed, 96 insertions(+), 35 deletions(-) diff --git a/providers/common/ai/docs/operators/document_loader.rst b/providers/common/ai/docs/operators/document_loader.rst index 39f4d9d501cf1..7bb697ff12d27 100644 --- a/providers/common/ai/docs/operators/document_loader.rst +++ b/providers/common/ai/docs/operators/document_loader.rst @@ -85,6 +85,13 @@ Install the ``docx`` extra to parse Word documents via All non-empty paragraphs are concatenated into a single document per file. +.. note:: + + DOCX extraction reads paragraph text only. Tables, headers, footers, and + footnotes are not included. For comprehensive DOCX parsing, use a dedicated + document extraction tool like Unstructured or Docling as a custom parser + backend. + Glob patterns and filtering ---------------------------- @@ -164,16 +171,23 @@ a ``@task`` that maps fields to text and metadata is more appropriate than Loading from bytes ------------------ -When upstream tasks pass file content via XCom, use ``source_bytes`` with -an explicit ``file_type``: +When upstream tasks produce file content as bytes, pass them directly via a +``@task`` function. Note that ``source_bytes`` is not a template field because +Jinja stringifies ``bytes`` to their ``repr``, which breaks binary parsing: .. code-block:: python - load = DocumentLoaderOperator( - task_id="load", - source_bytes="{{ ti.xcom_pull(task_ids='fetch_file') }}", - file_type=".pdf", - ) + from airflow.decorators import task + from airflow.providers.common.ai.operators.document_loader import DocumentLoaderOperator + + @task + def parse_downloaded_pdf(raw_bytes: bytes) -> list[dict]: + op = DocumentLoaderOperator( + task_id="parse_pdf", + source_bytes=raw_bytes, + file_type=".pdf", + ) + return op.execute(context=get_current_context()) Parameters ---------- diff --git a/providers/common/ai/docs/operators/index.rst b/providers/common/ai/docs/operators/index.rst index 967a43de281f4..dec108990eee2 100644 --- a/providers/common/ai/docs/operators/index.rst +++ b/providers/common/ai/docs/operators/index.rst @@ -21,7 +21,7 @@ Common AI Operators Choosing the right operator --------------------------- -The common-ai provider ships five operators (and matching ``@task`` decorators). Use this table +The common-ai provider ships several operators (and matching ``@task`` decorators). Use this table to pick the one that fits your use case: .. list-table:: @@ -66,6 +66,10 @@ read files) to produce its answer. You configure available tools through ``tools AgentOperator *works* without toolsets — pydantic-ai supports tool-less agents for multi-turn reasoning — but if you don't need tools, ``LLMOperator`` is simpler and more explicit. +**DocumentLoaderOperator** — framework-agnostic file parsing. Use this to convert files +(text, CSV, JSON, PDF, DOCX) into ``list[dict(text, metadata)]`` for downstream embedding. +No AI framework dependency. + Operator guides --------------- diff --git a/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py b/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py index e131d61617c30..2e286040e792f 100644 --- a/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py +++ b/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py @@ -69,7 +69,6 @@ class DocumentLoaderOperator(BaseOperator): template_fields: Sequence[str] = ( "source_path", - "source_bytes", "metadata_fields", ) @@ -94,11 +93,11 @@ def __init__( **kwargs: Any, ) -> None: super().__init__(**kwargs) - if source_path and source_bytes: + if source_path is not None and source_bytes is not None: raise ValueError("Provide exactly one of 'source_path' or 'source_bytes', not both.") - if not source_path and not source_bytes: + if source_path is None and source_bytes is None: raise ValueError("Provide exactly one of 'source_path' or 'source_bytes'.") - if source_bytes and not file_type: + if source_bytes is not None and file_type is None: raise ValueError("'file_type' is required when using 'source_bytes' (e.g. '.pdf').") self.source_path = source_path @@ -109,11 +108,13 @@ def __init__( self.metadata_fields = metadata_fields def execute(self, context: Context) -> list[dict[str, Any]]: - if self.source_bytes: + if self.source_bytes is not None: documents = self._parse_bytes(self.source_bytes, self.file_type) file_count = 1 else: files = self._resolve_files(self.source_path) + if not files: + raise FileNotFoundError(f"No files found matching '{self.source_path}'.") file_count = len(files) documents = [] for file_path in files: @@ -144,7 +145,7 @@ def _resolve_files(self, source_path: str) -> list[Path]: results = [p for p in candidates if p.is_file()] if self.file_extensions: - allowed = {ext if ext.startswith(".") else f".{ext}" for ext in self.file_extensions} + allowed = {(ext if ext.startswith(".") else f".{ext}").lower() for ext in self.file_extensions} results = [p for p in results if p.suffix.lower() in allowed] return results @@ -155,9 +156,9 @@ def _parse_bytes(self, raw: bytes, file_type: str) -> list[dict[str, Any]]: if backend in ("pypdf", "python-docx"): with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: - tmp.write(raw) tmp_path = Path(tmp.name) try: + tmp_path.write_bytes(raw) return self._parse_file(tmp_path, ext) finally: os.unlink(tmp_path) @@ -228,7 +229,10 @@ def _parse_json_text(self, text: str) -> list[dict[str, Any]]: data = json.loads(text) if isinstance(data, list): return [ - {"text": json.dumps(item, ensure_ascii=False), "metadata": {"item_index": idx}} + { + "text": item if isinstance(item, str) else json.dumps(item, ensure_ascii=False), + "metadata": {"item_index": idx}, + } for idx, item in enumerate(data) ] return [{"text": json.dumps(data, ensure_ascii=False), "metadata": {}}] @@ -236,11 +240,10 @@ def _parse_json_text(self, text: str) -> list[dict[str, Any]]: def _parse_pdf(self, file_path: Path) -> list[dict[str, Any]]: try: from pypdf import PdfReader - except ImportError: - raise ImportError( - "pypdf is required for PDF parsing. " - "Install it with: pip install apache-airflow-providers-common-ai[pdf]" - ) + except ImportError as e: + from airflow.providers.common.compat.sdk import AirflowOptionalProviderFeatureException + + raise AirflowOptionalProviderFeatureException(e) reader = PdfReader(str(file_path)) documents = [] @@ -256,13 +259,19 @@ def _parse_pdf(self, file_path: Path) -> list[dict[str, Any]]: return documents def _parse_docx(self, file_path: Path) -> list[dict[str, Any]]: + """ + Parse a DOCX file into documents. + + Extracts paragraph text only. Tables, headers, footers, and footnotes + are not included. For comprehensive DOCX parsing, use Unstructured or + Docling as a custom parser backend. + """ try: from docx import Document - except ImportError: - raise ImportError( - "python-docx is required for DOCX parsing. " - "Install it with: pip install apache-airflow-providers-common-ai[docx]" - ) + except ImportError as e: + from airflow.providers.common.compat.sdk import AirflowOptionalProviderFeatureException + + raise AirflowOptionalProviderFeatureException(e) doc = Document(str(file_path)) paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] diff --git a/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py b/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py index 14cf92ba14dbe..a16b29dd73363 100644 --- a/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py +++ b/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py @@ -26,7 +26,7 @@ class TestDocumentLoaderInit: def test_template_fields(self): - expected = {"source_path", "source_bytes", "metadata_fields"} + expected = {"source_path", "metadata_fields"} assert set(DocumentLoaderOperator.template_fields) == expected def test_both_sources_raises(self): @@ -41,6 +41,10 @@ def test_source_bytes_without_file_type_raises(self): with pytest.raises(ValueError, match="file_type"): DocumentLoaderOperator(task_id="test", source_bytes=b"hello") + def test_empty_bytes_without_file_type_raises(self): + with pytest.raises(ValueError, match="file_type"): + DocumentLoaderOperator(task_id="test", source_bytes=b"") + class TestTextParser: def test_txt_file(self, tmp_path): @@ -110,6 +114,18 @@ def test_json_single_object(self, tmp_path): assert len(result) == 1 assert "key" in result[0]["text"] + def test_json_string_primitives(self, tmp_path): + f = tmp_path / "strings.json" + f.write_text('["alpha", "beta", "gamma"]', encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert len(result) == 3 + assert result[0]["text"] == "alpha" + assert result[1]["text"] == "beta" + assert result[2]["text"] == "gamma" + def test_json_from_bytes(self): raw = b'[{"a": 1}, {"b": 2}]' op = DocumentLoaderOperator(task_id="test", source_bytes=raw, file_type=".json") @@ -171,13 +187,15 @@ def test_pdf_skips_empty_pages(self, tmp_path): assert len(result) == 0 - def test_pdf_missing_raises_importerror(self, tmp_path): + def test_pdf_missing_raises_optional_feature_exception(self, tmp_path): + from airflow.providers.common.compat.sdk import AirflowOptionalProviderFeatureException + f = tmp_path / "doc.pdf" f.write_bytes(b"fake pdf") with patch.dict("sys.modules", {"pypdf": None}): op = DocumentLoaderOperator(task_id="test", source_path=str(f)) - with pytest.raises(ImportError, match="apache-airflow-providers-common-ai\\[pdf\\]"): + with pytest.raises(AirflowOptionalProviderFeatureException): op.execute(context=MagicMock()) @@ -205,13 +223,15 @@ def test_docx_parsing(self, tmp_path): assert "First paragraph" in result[0]["text"] assert "Second paragraph" in result[0]["text"] - def test_docx_missing_raises_importerror(self, tmp_path): + def test_docx_missing_raises_optional_feature_exception(self, tmp_path): + from airflow.providers.common.compat.sdk import AirflowOptionalProviderFeatureException + f = tmp_path / "doc.docx" f.write_bytes(b"fake docx") with patch.dict("sys.modules", {"docx": None}): op = DocumentLoaderOperator(task_id="test", source_path=str(f)) - with pytest.raises(ImportError, match="apache-airflow-providers-common-ai\\[docx\\]"): + with pytest.raises(AirflowOptionalProviderFeatureException): op.execute(context=MagicMock()) @@ -247,11 +267,10 @@ def test_file_extensions_filter(self, tmp_path): assert len(result) == 1 assert result[0]["text"] == "keep me" - def test_empty_directory_returns_empty(self, tmp_path): + def test_empty_directory_raises_file_not_found(self, tmp_path): op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path)) - result = op.execute(context=MagicMock()) - - assert result == [] + with pytest.raises(FileNotFoundError, match="No files found"): + op.execute(context=MagicMock()) def test_unknown_extension_raises(self, tmp_path): f = tmp_path / "data.xyz" @@ -261,6 +280,21 @@ def test_unknown_extension_raises(self, tmp_path): with pytest.raises(ValueError, match="No parser registered"): op.execute(context=MagicMock()) + def test_nonexistent_glob_raises_file_not_found(self, tmp_path): + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path / "*.nope")) + with pytest.raises(FileNotFoundError, match="No files found"): + op.execute(context=MagicMock()) + + def test_file_extensions_case_insensitive(self, tmp_path): + (tmp_path / "keep.txt").write_text("keep me", encoding="utf-8") + (tmp_path / "skip.md").write_text("skip me", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path), file_extensions=[".TXT"]) + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert result[0]["text"] == "keep me" + class TestOutputShape: def test_every_item_has_text_and_metadata(self, tmp_path): From 8f3aee40f0e8ea9ca4174fe9d2a1c054a78cab7b Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 20 May 2026 11:38:11 +0100 Subject: [PATCH 3/5] Refactor DocumentLoaderOperator: streams, encoding, JSON shape, skip rules Rebases onto main to recover the 0.3.0 release entries that were rolled back on the original branch, and applies the review feedback the user- side review surfaced. Operator - Replace the temp-file dance for PDF/DOCX bytes with in-memory streams. ``pypdf.PdfReader`` and ``docx.Document`` both accept binary streams, so ``source_bytes`` now goes through ``io.BytesIO`` directly. No more ``NamedTemporaryFile(delete=False)`` + ``os.unlink``. - Add ``encoding`` and ``encoding_errors`` parameters for non-UTF-8 input (Windows-1252 CSVs, files with a leading byte-order mark, ...). Failed decodes raise a ``ValueError`` that includes the offending file path so directory-mode runs are diagnosable. - Add ``json_text_field``: when set, the named key on each JSON item becomes the embedding text and every other key lands in ``metadata``. When unset, JSON dicts are flattened to ``"k: v, k: v"`` (matches the CSV parser) instead of being dumped back to JSON syntax tokens. - Directory-mode ``source_path`` now silently ignores files whose name starts with ``.`` (``.DS_Store``, editor swap files, ``.gitkeep``) and skips unknown-extension files with a warning rather than crashing on the first stray file. - ``glob.glob(source_path, recursive=True)`` so ``**`` patterns walk subdirectories (the docs already advertised this). - Auto-extracted metadata (``file_name``, ``file_path``, ``row_index``, ``item_index``, ``page_number``) now takes precedence over ``metadata_fields`` with the same key (via ``setdefault``). - Expanded ``template_fields`` to include ``file_type``, ``file_extensions``, ``parser`` so they can be driven from Jinja. - Hoisted ``AirflowOptionalProviderFeatureException`` import to the module top so the lazy ``pypdf`` / ``docx`` blocks are 2 lines each. Docs - Switched all inline ``code-block:: python`` snippets to ``exampleinclude::`` directives pointing at a new ``example_document_loader.py`` (basic, directory, bytes, ``json_text_field`` patterns), matching the convention every other operator in this provider uses. - New sections documenting encoding handling, metadata precedence, and the directory-mode skip rules (files whose name starts with a ``.`` / unknown-extension warn-and-skip). Tests - Dropped the tautological ``test_template_fields`` that just round- tripped the class attribute; replaced with a behavioural check confirming the templated fields are actually in the templated set. - New coverage for: dot-prefixed-name skip, unknown-extension warn + skip, ``encoding`` / ``encoding_errors``, ``json_text_field``, JSON dict flatten, CSV empty-cell skip, ``metadata_fields`` precedence (auto wins), recursive ``**`` glob. - PDF/DOCX bytes tests assert the library was called with a ``BytesIO``, locking in the no-temp-file behaviour. --- .../ai/docs/operators/document_loader.rst | 234 ++++++++++-------- .../example_dags/example_document_loader.py | 125 ++++++++++ .../providers/common/ai/get_provider_info.py | 2 + .../common/ai/operators/document_loader.py | 199 ++++++++++----- .../ai/operators/test_document_loader.py | 197 ++++++++++++++- uv.lock | 53 ++-- 6 files changed, 618 insertions(+), 192 deletions(-) create mode 100644 providers/common/ai/src/airflow/providers/common/ai/example_dags/example_document_loader.py diff --git a/providers/common/ai/docs/operators/document_loader.rst b/providers/common/ai/docs/operators/document_loader.rst index 7bb697ff12d27..cb238d1811035 100644 --- a/providers/common/ai/docs/operators/document_loader.rst +++ b/providers/common/ai/docs/operators/document_loader.rst @@ -22,96 +22,129 @@ Use :class:`~airflow.providers.common.ai.operators.document_loader.DocumentLoaderOperator` to parse files into ``list[dict(text, metadata)]`` for downstream embedding -pipelines. The operator bridges Airflow's connectivity layer (hooks that +pipelines. The operator bridges Airflow's connectivity layer (hooks that produce bytes or local files) and the AI embedding layer (operators that need structured text with metadata). -The operator is **framework-agnostic** — it has no dependency on LlamaIndex, +The operator is **framework-agnostic** -- it has no dependency on LlamaIndex, LangChain, or any other AI framework. -Built-in formats ----------------- +Basic usage +----------- ``.txt``, ``.md``, ``.csv``, and ``.json`` are handled with zero extra dependencies: -.. code-block:: python - - from airflow.providers.common.ai.operators.document_loader import DocumentLoaderOperator +.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_document_loader.py + :language: python + :start-after: [START howto_operator_document_loader_basic] + :end-before: [END howto_operator_document_loader_basic] - load_docs = DocumentLoaderOperator( - task_id="load_docs", - source_path="/opt/airflow/data/articles/", - ) - -CSV files produce one document per row. JSON files with a top-level array -produce one document per element; a single JSON object produces one document. +CSV files produce one document per row, with empty cells skipped. JSON files +with a top-level array produce one document per element; a single JSON object +produces one document. By default each dict is flattened into ``"key: value, +key: value"`` text so the embedding sees content tokens rather than JSON +syntax (see the ``json_text_field`` section below for the structured variant). PDF parsing ----------- -Install the ``pdf`` extra to parse PDF files via `pypdf `__: - -.. code-block:: bash +Install the ``pdf`` extra to parse PDF files via +`pypdf `__:: pip install apache-airflow-providers-common-ai[pdf] -.. code-block:: python - - load_pdfs = DocumentLoaderOperator( - task_id="load_pdfs", - source_path="/opt/airflow/data/reports/*.pdf", - ) - -Each page with extractable text becomes a separate document. Empty pages are -skipped. The ``page_number`` is included in the document metadata. +Each page with extractable text becomes a separate document. Empty pages are +skipped. ``page_number`` is included in the document metadata. DOCX parsing ------------ Install the ``docx`` extra to parse Word documents via -`python-docx `__: - -.. code-block:: bash +`python-docx `__:: pip install apache-airflow-providers-common-ai[docx] -.. code-block:: python - - load_word = DocumentLoaderOperator( - task_id="load_word", - source_path="/opt/airflow/data/specs/*.docx", - ) - All non-empty paragraphs are concatenated into a single document per file. .. note:: DOCX extraction reads paragraph text only. Tables, headers, footers, and - footnotes are not included. For comprehensive DOCX parsing, use a dedicated - document extraction tool like Unstructured or Docling as a custom parser + footnotes are not included. For richer DOCX parsing, use a dedicated + extraction tool (``Unstructured``, ``docling``) as a custom parser backend. -Glob patterns and filtering +Directory mode and filtering ---------------------------- -Pass a glob pattern to ``source_path`` to match multiple files. Use -``file_extensions`` to limit which files are processed: +Point ``source_path`` at a directory or pass a glob pattern (``**`` enables +recursive matching). Combine with ``file_extensions`` to scope which files +are processed: + +.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_document_loader.py + :language: python + :start-after: [START howto_operator_document_loader_directory] + :end-before: [END howto_operator_document_loader_directory] + +Directory-mode behavior when ``file_extensions`` is omitted: + +- Files whose name starts with a ``.`` (``.DS_Store``, editor swap files, + ``.gitkeep``, ...) are silently ignored. +- Files whose extension is not in the built-in dispatch map are skipped + with a warning rather than crashing the operator. A glob pattern that + matches an unknown extension is treated as intentional and parsed via + the explicit ``parser`` argument. + +Loading from bytes +------------------ + +When upstream tasks produce file content as bytes (S3, GCS, HTTP, etc.), +pass them via ``source_bytes`` and tell the operator how to interpret them +with ``file_type``. ``source_bytes`` is not a template field because Jinja +would render ``bytes`` as their ``repr`` text, which would break binary +parsing: + +.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_document_loader.py + :language: python + :start-after: [START howto_operator_document_loader_bytes] + :end-before: [END howto_operator_document_loader_bytes] + +PDF and DOCX bytes are parsed via an in-memory stream -- no temporary files +on disk. + +Structured JSON ingestion +------------------------- + +For arrays of records where one field is the body and the rest are metadata +(article ingestion, ticket exports, ...), set ``json_text_field`` to the key +that holds the text. Every other key on the same item lands in ``metadata``: + +.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_document_loader.py + :language: python + :start-after: [START howto_operator_document_loader_json_field] + :end-before: [END howto_operator_document_loader_json_field] + +For **arbitrary API data** (Salesforce SOQL results, database query exports), +a ``@task`` that maps fields to text and metadata is still appropriate when +the field shape is more complex than what ``json_text_field`` covers: .. code-block:: python - load_filtered = DocumentLoaderOperator( - task_id="load_filtered", - source_path="/opt/airflow/data/mixed/*", - file_extensions=[".pdf", ".txt"], - ) + @task + def transform_cases(records: list[dict]) -> list[dict]: + return [ + { + "text": f"{r['Subject']}\n\n{r['Description']}", + "metadata": {"case_id": r["Id"], "source": "salesforce"}, + } + for r in records + ] -Composing with downstream operators ------------------------------------- +Composing with downstream embedding operators +--------------------------------------------- The output format (``list[dict(text, metadata)]``) is designed to feed -directly into embedding operators. For example, with the LlamaIndex -``EmbeddingOperator``: +directly into embedding operators. With LlamaIndex's ``EmbeddingOperator``: .. code-block:: python @@ -129,7 +162,7 @@ directly into embedding operators. For example, with the LlamaIndex load >> embed Composing with Airflow providers ---------------------------------- +-------------------------------- Use any Airflow provider to download files, then parse them with ``DocumentLoaderOperator``: @@ -152,57 +185,62 @@ Use any Airflow provider to download files, then parse them with download >> load -For **structured API data** (Salesforce SOQL results, database query exports), -a ``@task`` that maps fields to text and metadata is more appropriate than -``DocumentLoaderOperator``, which is designed for binary file parsing: - -.. code-block:: python - - @task - def transform_cases(records: list[dict]) -> list[dict]: - return [ - { - "text": f"{r['Subject']}\n\n{r['Description']}", - "metadata": {"case_id": r["Id"], "source": "salesforce"}, - } - for r in records - ] - -Loading from bytes ------------------- - -When upstream tasks produce file content as bytes, pass them directly via a -``@task`` function. Note that ``source_bytes`` is not a template field because -Jinja stringifies ``bytes`` to their ``repr``, which breaks binary parsing: +Non-UTF-8 inputs +---------------- -.. code-block:: python +The text parsers (``.txt`` / ``.md`` / ``.csv`` / ``.json``) and the bytes +path default to UTF-8. To handle Windows-1252 CSVs, files with a leading +``utf-8-sig`` byte-order mark, or any other encoding, set the ``encoding`` +parameter on the operator (and optionally ``encoding_errors="replace"`` to +tolerate mixed-encoding sources at the cost of some character loss). A +failed decode includes the offending file path in the error so +directory-mode runs are easy to diagnose. - from airflow.decorators import task - from airflow.providers.common.ai.operators.document_loader import DocumentLoaderOperator +Metadata precedence +------------------- - @task - def parse_downloaded_pdf(raw_bytes: bytes) -> list[dict]: - op = DocumentLoaderOperator( - task_id="parse_pdf", - source_bytes=raw_bytes, - file_type=".pdf", - ) - return op.execute(context=get_current_context()) +Auto-extracted metadata keys -- ``file_name``, ``file_path``, ``row_index``, +``item_index``, ``page_number`` -- take precedence over keys with the same +name in ``metadata_fields``. ``metadata_fields`` fills gaps; it never +overwrites the auto-extracted shape. Parameters ---------- -- ``source_path``: Local file path, directory, or glob pattern. - Mutually exclusive with ``source_bytes``. -- ``source_bytes``: Raw file bytes from XCom. Requires ``file_type``. - Mutually exclusive with ``source_path``. -- ``file_type``: File extension hint (e.g. ``".pdf"``). Required with - ``source_bytes``. Optional with ``source_path`` to override - auto-detection. -- ``parser``: Parsing backend. ``"auto"`` (default) selects from the file - extension. Set explicitly to force a specific backend (e.g. ``"text"`` - to treat an unknown extension as plain text). -- ``file_extensions``: Filter which files to process when ``source_path`` - matches multiple files. -- ``metadata_fields``: Extra key-value pairs merged into every document's - metadata dict. +.. list-table:: + :header-rows: 1 + :widths: 25 75 + + * - Parameter + - Description + * - ``source_path`` + - Local file, directory, or glob pattern. ``**`` is recursive. Mutually + exclusive with ``source_bytes``. + * - ``source_bytes`` + - Raw file bytes from XCom. Requires ``file_type``. Mutually exclusive + with ``source_path``. Not a template field (bytes don't survive Jinja). + * - ``file_type`` + - File extension hint (e.g. ``".pdf"``). Required with ``source_bytes``; + optional with ``source_path`` to override auto-detection. + * - ``parser`` + - Parsing backend. ``"auto"`` (default) picks from the file extension. + Set explicitly to force a backend (e.g. ``"text"`` to treat an + unknown extension as plain text). + * - ``file_extensions`` + - Filter for ``source_path`` directory or glob. When omitted in + directory mode, files whose name starts with a ``.`` are ignored + and unknown-extension files are skipped with a warning. + * - ``metadata_fields`` + - Extra key-value pairs merged into every document's metadata. Does + not override auto-extracted keys. + * - ``encoding`` + - Text encoding for the bytes path and ``.txt`` / ``.md`` / ``.csv`` / + ``.json`` files. Defaults to ``"utf-8"``. + * - ``encoding_errors`` + - How decode errors are handled (``"strict"`` / ``"replace"`` / + ``"ignore"``). Defaults to ``"strict"``. + * - ``json_text_field`` + - When parsing JSON, treat this key as the embedding text; every other + key on the same item lands in ``metadata``. When unset, dicts are + flattened to ``"k: v, k: v"`` so the embedding sees content tokens + rather than JSON syntax. diff --git a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_document_loader.py b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_document_loader.py new file mode 100644 index 0000000000000..245b9d83ed3a3 --- /dev/null +++ b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_document_loader.py @@ -0,0 +1,125 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +"""Example DAGs demonstrating DocumentLoaderOperator usage patterns. + +Each DAG covers a single pattern. The hook docs reference these via +``.. exampleinclude::`` so the runnable snippets stay in sync. +""" + +from __future__ import annotations + +from airflow.providers.common.ai.operators.document_loader import DocumentLoaderOperator +from airflow.providers.common.compat.sdk import dag, task + + +# [START howto_operator_document_loader_basic] +@dag(schedule=None) +def example_document_loader_basic(): + """Parse a single local file -- the operator infers the format from the suffix.""" + + load_docs = DocumentLoaderOperator( + task_id="load_docs", + source_path="/opt/airflow/data/articles/sample.md", + ) + + @task + def count_chunks(docs: list[dict]) -> int: + return len(docs) + + count_chunks(load_docs.output) + + +# [END howto_operator_document_loader_basic] + +example_document_loader_basic() + + +# [START howto_operator_document_loader_directory] +@dag(schedule=None) +def example_document_loader_directory(): + """Walk a directory recursively, only picking up PDFs and Markdown.""" + + load_docs = DocumentLoaderOperator( + task_id="load_docs", + # `**` matches across subdirectories thanks to glob's recursive mode. + source_path="/opt/airflow/data/library/**/*", + file_extensions=[".pdf", ".md"], + metadata_fields={"corpus": "library_v3"}, + ) + + @task + def summarise(docs: list[dict]) -> dict: + return { + "files": len({d["metadata"]["file_path"] for d in docs}), + "chunks": len(docs), + } + + summarise(load_docs.output) + + +# [END howto_operator_document_loader_directory] + +example_document_loader_directory() + + +# [START howto_operator_document_loader_bytes] +@dag(schedule=None) +def example_document_loader_bytes(): + """Feed raw bytes from an upstream hook (e.g. an S3 download) into the parser.""" + + @task + def fetch_pdf_bytes() -> bytes: + # In real use this would be an S3Hook.read_key, a GCSHook.download_as_bytes, + # or any other byte-producing call. + return b"%PDF-1.4 ..." + + load_docs = DocumentLoaderOperator( + task_id="load_docs", + source_bytes=fetch_pdf_bytes(), + file_type=".pdf", + metadata_fields={"corpus": "uploads"}, + ) + + load_docs + + +# [END howto_operator_document_loader_bytes] + +example_document_loader_bytes() + + +# [START howto_operator_document_loader_json_field] +@dag(schedule=None) +def example_document_loader_json_field(): + """Read an array of records, embedding only the ``body`` field per item. + + Every other key (``title``, ``author``, ``published_at``, ...) lands in + ``metadata`` so it stays available for filtering or display. + """ + + load_docs = DocumentLoaderOperator( + task_id="load_docs", + source_path="/opt/airflow/data/articles.json", + json_text_field="body", + ) + + load_docs + + +# [END howto_operator_document_loader_json_field] + +example_document_loader_json_field() diff --git a/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py b/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py index 7cb8513f9e606..d87733bb5ffa0 100644 --- a/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py +++ b/providers/common/ai/src/airflow/providers/common/ai/get_provider_info.py @@ -37,6 +37,7 @@ def get_provider_info(): "/docs/apache-airflow-providers-common-ai/operators/llm_branch.rst", "/docs/apache-airflow-providers-common-ai/operators/llm_sql.rst", "/docs/apache-airflow-providers-common-ai/operators/llm_schema_compare.rst", + "/docs/apache-airflow-providers-common-ai/operators/document_loader.rst", ], "tags": ["ai"], }, @@ -298,6 +299,7 @@ def get_provider_info(): "airflow.providers.common.ai.operators.llm_branch", "airflow.providers.common.ai.operators.llm_sql", "airflow.providers.common.ai.operators.llm_schema_compare", + "airflow.providers.common.ai.operators.document_loader", ], } ], diff --git a/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py b/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py index 2e286040e792f..d8872b2943277 100644 --- a/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py +++ b/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py @@ -14,21 +14,20 @@ # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. -"""Operator for parsing files into document dicts suitable for embedding.""" - from __future__ import annotations import csv import glob import io import json -import os -import tempfile from collections.abc import Sequence from pathlib import Path -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, BinaryIO -from airflow.providers.common.compat.sdk import BaseOperator +from airflow.providers.common.compat.sdk import ( + AirflowOptionalProviderFeatureException, + BaseOperator, +) if TYPE_CHECKING: from airflow.sdk import Context @@ -40,35 +39,55 @@ class DocumentLoaderOperator(BaseOperator): Bridges Airflow's connectivity layer (hooks that produce bytes or local files) and the AI embedding layer (operators that need structured text - with metadata). Framework-agnostic: no LlamaIndex, LangChain, or other + with metadata). Framework-agnostic: no LlamaIndex, LangChain, or other AI framework dependency. Built-in parsers handle ``.txt``, ``.md``, ``.csv``, and ``.json`` with - zero extra dependencies. PDF and DOCX support require optional packages + zero extra dependencies. PDF and DOCX support require optional packages installable via extras:: pip install apache-airflow-providers-common-ai[pdf] # pypdf pip install apache-airflow-providers-common-ai[docx] # python-docx - Provide exactly one of ``source_path`` or ``source_bytes``. When using + Provide exactly one of ``source_path`` or ``source_bytes``. When using ``source_bytes``, ``file_type`` is required so the operator knows which parser to use. - :param source_path: Local file path or glob pattern (e.g. ``/data/*.pdf``). + :param source_path: Local file path or glob pattern (e.g. + ``/data/*.pdf``). ``**`` enables recursive matching. :param source_bytes: Raw file bytes, typically from XCom. :param file_type: File extension hint when using ``source_bytes`` - (e.g. ``".pdf"``). Also accepted with ``source_path`` to override + (e.g. ``".pdf"``). Also accepted with ``source_path`` to override auto-detection. - :param parser: Parsing backend selection. ``"auto"`` (default) picks the + :param parser: Parsing backend selection. ``"auto"`` (default) picks the backend from the file extension. :param file_extensions: When ``source_path`` is a directory or glob, - only process files whose extension is in this list. + only process files whose extension is in this list. When omitted, + the operator processes only files whose extension is known to the + built-in dispatch (others are skipped with a warning) and silently + ignores files whose name starts with a dot. :param metadata_fields: Extra key-value pairs merged into every - document's ``metadata`` dict. + document's ``metadata`` dict. Auto-extracted fields such as + ``file_name``, ``file_path``, ``row_index``, ``item_index``, and + ``page_number`` take precedence over keys with the same name. + :param encoding: Text encoding used for ``.txt``/``.md``/``.csv``/``.json`` + and for the bytes path. Defaults to ``"utf-8"``. + :param encoding_errors: How decode errors are handled. Defaults to + ``"strict"``; set to ``"replace"`` or ``"ignore"`` to tolerate + mixed-encoding inputs at the cost of some character loss. + :param json_text_field: When parsing JSON, treat this key as the + embedding text and put every other key into ``metadata``. Applies + to each item when the top-level JSON is a list, or to the object + when it is a single dict. When ``None`` (default), the operator + flattens dicts into ``"k: v, k: v"`` text (same shape as the CSV + parser). """ template_fields: Sequence[str] = ( "source_path", + "file_type", + "file_extensions", + "parser", "metadata_fields", ) @@ -90,6 +109,9 @@ def __init__( parser: str = "auto", file_extensions: list[str] | None = None, metadata_fields: dict[str, Any] | None = None, + encoding: str = "utf-8", + encoding_errors: str = "strict", + json_text_field: str | None = None, **kwargs: Any, ) -> None: super().__init__(**kwargs) @@ -106,12 +128,17 @@ def __init__( self.parser = parser self.file_extensions = file_extensions self.metadata_fields = metadata_fields + self.encoding = encoding + self.encoding_errors = encoding_errors + self.json_text_field = json_text_field def execute(self, context: Context) -> list[dict[str, Any]]: if self.source_bytes is not None: + assert self.file_type is not None # noqa: S101 -- enforced in __init__ documents = self._parse_bytes(self.source_bytes, self.file_type) file_count = 1 else: + assert self.source_path is not None # noqa: S101 -- enforced in __init__ files = self._resolve_files(self.source_path) if not files: raise FileNotFoundError(f"No files found matching '{self.source_path}'.") @@ -127,9 +154,11 @@ def execute(self, context: Context) -> list[dict[str, Any]]: if self.metadata_fields: for doc in documents: - doc["metadata"].update(self.metadata_fields) + # Auto-extracted keys (file_name, page_number, ...) take precedence. + for key, value in self.metadata_fields.items(): + doc["metadata"].setdefault(key, value) - self.log.info("Parsed %d documents from %d files", len(documents), file_count) + self.log.info("Parsed %d documents from %d file(s)", len(documents), file_count) return documents def _resolve_files(self, source_path: str) -> list[Path]: @@ -138,15 +167,30 @@ def _resolve_files(self, source_path: str) -> list[Path]: return [path] if path.is_dir(): - candidates = sorted(path.iterdir()) + candidates = sorted(p for p in path.iterdir() if not p.name.startswith(".")) else: - candidates = [Path(p) for p in sorted(glob.glob(source_path))] + # `recursive=True` makes `**` match across directories per the docstring. + candidates = [Path(p) for p in sorted(glob.glob(source_path, recursive=True))] results = [p for p in candidates if p.is_file()] if self.file_extensions: allowed = {(ext if ext.startswith(".") else f".{ext}").lower() for ext in self.file_extensions} results = [p for p in results if p.suffix.lower() in allowed] + elif path.is_dir(): + # In directory mode with no explicit filter: skip files we don't + # know how to parse rather than crashing on the first .DS_Store + # or editor temp file. A glob pattern is treated as intentional. + known = set(self.EXTENSION_BACKEND_MAP.keys()) + unknown = [p for p in results if p.suffix.lower() not in known] + if unknown: + self.log.warning( + "Skipping %d file(s) with unrecognised extension in '%s': %s", + len(unknown), + source_path, + ", ".join(sorted({p.suffix or "" for p in unknown})), + ) + results = [p for p in results if p.suffix.lower() in known] return results @@ -154,16 +198,12 @@ def _parse_bytes(self, raw: bytes, file_type: str) -> list[dict[str, Any]]: ext = file_type if file_type.startswith(".") else f".{file_type}" backend = self._resolve_backend(ext) - if backend in ("pypdf", "python-docx"): - with tempfile.NamedTemporaryFile(suffix=ext, delete=False) as tmp: - tmp_path = Path(tmp.name) - try: - tmp_path.write_bytes(raw) - return self._parse_file(tmp_path, ext) - finally: - os.unlink(tmp_path) + if backend == "pypdf": + return self._parse_pdf_stream(io.BytesIO(raw)) + if backend == "python-docx": + return self._parse_docx_stream(io.BytesIO(raw)) - text = raw.decode("utf-8") + text = self._decode(raw, source_hint=f"") if backend == "csv": return self._parse_csv_text(text) if backend == "json": @@ -180,9 +220,11 @@ def _parse_file(self, file_path: Path, ext: str) -> list[dict[str, Any]]: if backend == "json": return self._parse_json(file_path) if backend == "pypdf": - return self._parse_pdf(file_path) + with file_path.open("rb") as fh: + return self._parse_pdf_stream(fh) if backend == "python-docx": - return self._parse_docx(file_path) + with file_path.open("rb") as fh: + return self._parse_docx_stream(fh) raise ValueError(f"No parser found for backend '{backend}'.") @@ -200,80 +242,101 @@ def _resolve_backend(self, ext: str) -> str: ) return self.EXTENSION_BACKEND_MAP[ext] + def _decode(self, raw: bytes, *, source_hint: str) -> str: + try: + return raw.decode(self.encoding, errors=self.encoding_errors) + except UnicodeDecodeError as e: + raise ValueError( + f"Failed to decode {source_hint} as {self.encoding!r}: {e}. " + f"Pass encoding=... or encoding_errors='replace' to tolerate this." + ) from e + + def _read_text(self, file_path: Path) -> str: + return self._decode(file_path.read_bytes(), source_hint=str(file_path)) + def _parse_text(self, file_path: Path) -> list[dict[str, Any]]: - text = file_path.read_text(encoding="utf-8") - return [{"text": text, "metadata": {}}] + return [{"text": self._read_text(file_path), "metadata": {}}] def _parse_csv(self, file_path: Path) -> list[dict[str, Any]]: - text = file_path.read_text(encoding="utf-8") - return self._parse_csv_text(text) + return self._parse_csv_text(self._read_text(file_path)) def _parse_csv_text(self, text: str) -> list[dict[str, Any]]: reader = csv.DictReader(io.StringIO(text)) documents = [] for row_idx, row in enumerate(reader): - row_text = ", ".join(f"{k}: {v}" for k, v in row.items() if v) - documents.append( - { - "text": row_text, - "metadata": {"row_index": row_idx}, - } - ) + # Skip empty cells to avoid noisy "col: ," in the text. + row_text = ", ".join(f"{k}: {v}" for k, v in row.items() if v != "") + documents.append({"text": row_text, "metadata": {"row_index": row_idx}}) return documents def _parse_json(self, file_path: Path) -> list[dict[str, Any]]: - text = file_path.read_text(encoding="utf-8") - return self._parse_json_text(text) + return self._parse_json_text(self._read_text(file_path)) def _parse_json_text(self, text: str) -> list[dict[str, Any]]: data = json.loads(text) if isinstance(data, list): - return [ - { - "text": item if isinstance(item, str) else json.dumps(item, ensure_ascii=False), - "metadata": {"item_index": idx}, - } - for idx, item in enumerate(data) - ] - return [{"text": json.dumps(data, ensure_ascii=False), "metadata": {}}] - - def _parse_pdf(self, file_path: Path) -> list[dict[str, Any]]: + return [self._json_item_to_doc(item, item_index=idx) for idx, item in enumerate(data)] + return [self._json_item_to_doc(data, item_index=None)] + + def _json_item_to_doc(self, item: Any, *, item_index: int | None) -> dict[str, Any]: + metadata: dict[str, Any] = {} + if item_index is not None: + metadata["item_index"] = item_index + + if isinstance(item, str): + text = item + elif isinstance(item, dict): + if self.json_text_field is not None: + # Pull the named field out as the text; everything else goes + # to metadata. Common pattern for "ingest article body, keep + # title/author/url for filtering". + text_value = item.get(self.json_text_field, "") + text = ( + text_value if isinstance(text_value, str) else json.dumps(text_value, ensure_ascii=False) + ) + for k, v in item.items(): + if k == self.json_text_field: + continue + metadata[k] = v + else: + # No text field declared: flatten dict to "k: v, k: v" so the + # embedding sees content tokens, not JSON syntax. Mirrors the + # CSV parser's behaviour. + text = ", ".join(f"{k}: {v}" for k, v in item.items() if v not in (None, "")) + else: + text = json.dumps(item, ensure_ascii=False) + + return {"text": text, "metadata": metadata} + + def _parse_pdf_stream(self, stream: BinaryIO) -> list[dict[str, Any]]: try: from pypdf import PdfReader except ImportError as e: - from airflow.providers.common.compat.sdk import AirflowOptionalProviderFeatureException - raise AirflowOptionalProviderFeatureException(e) - reader = PdfReader(str(file_path)) + reader = PdfReader(stream) documents = [] for page_num, page in enumerate(reader.pages): text = page.extract_text() or "" if text.strip(): - documents.append( - { - "text": text, - "metadata": {"page_number": page_num + 1}, - } - ) + documents.append({"text": text, "metadata": {"page_number": page_num + 1}}) return documents - def _parse_docx(self, file_path: Path) -> list[dict[str, Any]]: + def _parse_docx_stream(self, stream: BinaryIO) -> list[dict[str, Any]]: """ - Parse a DOCX file into documents. + Parse a DOCX stream into documents. Extracts paragraph text only. Tables, headers, footers, and footnotes - are not included. For comprehensive DOCX parsing, use Unstructured or - Docling as a custom parser backend. + are not included. For richer DOCX parsing, plug in a dedicated + extraction tool (``Unstructured``, ``docling``) as a custom parser + backend. """ try: from docx import Document except ImportError as e: - from airflow.providers.common.compat.sdk import AirflowOptionalProviderFeatureException - raise AirflowOptionalProviderFeatureException(e) - doc = Document(str(file_path)) + doc = Document(stream) paragraphs = [p.text for p in doc.paragraphs if p.text.strip()] text = "\n\n".join(paragraphs) return [{"text": text, "metadata": {}}] diff --git a/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py b/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py index a16b29dd73363..8740af23d0f60 100644 --- a/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py +++ b/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py @@ -17,6 +17,7 @@ from __future__ import annotations import json +import logging from unittest.mock import MagicMock, patch import pytest @@ -25,9 +26,28 @@ class TestDocumentLoaderInit: - def test_template_fields(self): - expected = {"source_path", "metadata_fields"} - assert set(DocumentLoaderOperator.template_fields) == expected + def test_template_fields_render_source_path_and_metadata(self): + """ + Behavioral check that the templated fields actually get rendered. + Replaces the previous tautological assertion that just round-tripped + the class attribute. + """ + op = DocumentLoaderOperator( + task_id="test", + source_path="/data/{{ ds }}/*.pdf", + file_type="{{ var.value.preferred_ext }}", + metadata_fields={"run_id": "{{ run_id }}"}, + ) + # Make sure each one is in template_fields so render_template_fields + # would substitute them. + assert "source_path" in op.template_fields + assert "file_type" in op.template_fields + assert "file_extensions" in op.template_fields + assert "parser" in op.template_fields + assert "metadata_fields" in op.template_fields + # source_bytes intentionally not templated -- Jinja stringifies bytes + # to their repr, which would break binary parsing. + assert "source_bytes" not in op.template_fields def test_both_sources_raises(self): with pytest.raises(ValueError, match="not both"): @@ -83,6 +103,17 @@ def test_csv_one_doc_per_row(self, tmp_path): assert result[0]["metadata"]["row_index"] == 0 assert result[1]["metadata"]["row_index"] == 1 + def test_csv_empty_cells_skipped(self, tmp_path): + f = tmp_path / "data.csv" + # Bob has no age -- "age: " should not appear in his row text. + f.write_text("name,age\nAlice,30\nBob,\n", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + result = op.execute(context=MagicMock()) + + assert "age: " not in result[1]["text"] + assert "Bob" in result[1]["text"] + def test_csv_from_bytes(self): raw = b"col1,col2\nval1,val2\n" op = DocumentLoaderOperator(task_id="test", source_bytes=raw, file_type=".csv") @@ -93,26 +124,32 @@ def test_csv_from_bytes(self): class TestJsonParser: - def test_json_array(self, tmp_path): + def test_json_array_flattens_dicts(self, tmp_path): f = tmp_path / "items.json" - data = [{"title": "First"}, {"title": "Second"}] + data = [{"title": "First", "tag": "alpha"}, {"title": "Second", "tag": "beta"}] f.write_text(json.dumps(data), encoding="utf-8") op = DocumentLoaderOperator(task_id="test", source_path=str(f)) result = op.execute(context=MagicMock()) assert len(result) == 2 + # Embedding should see "title: First, tag: alpha" rather than the raw + # JSON syntax tokens. + assert "title: First" in result[0]["text"] + assert "tag: alpha" in result[0]["text"] + assert result[0]["text"].startswith("title:") # no leading "{" assert result[0]["metadata"]["item_index"] == 0 - def test_json_single_object(self, tmp_path): + def test_json_single_object_flattens(self, tmp_path): f = tmp_path / "config.json" - f.write_text('{"key": "value"}', encoding="utf-8") + f.write_text('{"key": "value", "other": "thing"}', encoding="utf-8") op = DocumentLoaderOperator(task_id="test", source_path=str(f)) result = op.execute(context=MagicMock()) assert len(result) == 1 - assert "key" in result[0]["text"] + assert "key: value" in result[0]["text"] + assert "other: thing" in result[0]["text"] def test_json_string_primitives(self, tmp_path): f = tmp_path / "strings.json" @@ -126,6 +163,23 @@ def test_json_string_primitives(self, tmp_path): assert result[1]["text"] == "beta" assert result[2]["text"] == "gamma" + def test_json_text_field_pulls_body_keeps_rest_as_metadata(self, tmp_path): + f = tmp_path / "articles.json" + data = [ + {"title": "Hello", "body": "First article body.", "author": "Alice"}, + {"title": "World", "body": "Second article body.", "author": "Bob"}, + ] + f.write_text(json.dumps(data), encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f), json_text_field="body") + result = op.execute(context=MagicMock()) + + assert len(result) == 2 + assert result[0]["text"] == "First article body." + assert result[0]["metadata"]["title"] == "Hello" + assert result[0]["metadata"]["author"] == "Alice" + assert "body" not in result[0]["metadata"] + def test_json_from_bytes(self): raw = b'[{"a": 1}, {"b": 2}]' op = DocumentLoaderOperator(task_id="test", source_bytes=raw, file_type=".json") @@ -171,6 +225,28 @@ def test_pdf_parsing(self, tmp_path): assert result[0]["metadata"]["page_number"] == 1 assert result[1]["metadata"]["page_number"] == 2 + def test_pdf_from_bytes_uses_stream_no_tempfile(self, tmp_path): + """Bytes-mode parsing should go through BytesIO, never a temp file.""" + mock_page = MagicMock() + mock_page.extract_text.return_value = "Streamed content" + mock_reader = MagicMock() + mock_reader.pages = [mock_page] + + mock_pypdf = _make_mock_pypdf_module(mock_reader) + with patch.dict("sys.modules", {"pypdf": mock_pypdf}): + op = DocumentLoaderOperator(task_id="test", source_bytes=b"%PDF-1.4 ...", file_type=".pdf") + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert result[0]["text"] == "Streamed content" + # PdfReader should have been called once with a stream (BytesIO), + # not a file path string. + mock_pypdf.PdfReader.assert_called_once() + (call_arg,) = mock_pypdf.PdfReader.call_args.args + import io as _io + + assert isinstance(call_arg, _io.BytesIO) + def test_pdf_skips_empty_pages(self, tmp_path): mock_page = MagicMock() mock_page.extract_text.return_value = " " @@ -223,6 +299,24 @@ def test_docx_parsing(self, tmp_path): assert "First paragraph" in result[0]["text"] assert "Second paragraph" in result[0]["text"] + def test_docx_from_bytes_uses_stream_no_tempfile(self): + mock_para = MagicMock() + mock_para.text = "Stream paragraph" + mock_doc_obj = MagicMock() + mock_doc_obj.paragraphs = [mock_para] + + mock_docx = _make_mock_docx_module(mock_doc_obj) + with patch.dict("sys.modules", {"docx": mock_docx}): + op = DocumentLoaderOperator(task_id="test", source_bytes=b"fake docx", file_type=".docx") + result = op.execute(context=MagicMock()) + + assert "Stream paragraph" in result[0]["text"] + mock_docx.Document.assert_called_once() + (call_arg,) = mock_docx.Document.call_args.args + import io as _io + + assert isinstance(call_arg, _io.BytesIO) + def test_docx_missing_raises_optional_feature_exception(self, tmp_path): from airflow.providers.common.compat.sdk import AirflowOptionalProviderFeatureException @@ -248,6 +342,18 @@ def test_glob_multiple_files(self, tmp_path): texts = {doc["text"] for doc in result} assert texts == {"file a", "file b"} + def test_recursive_glob(self, tmp_path): + nested = tmp_path / "year" / "month" + nested.mkdir(parents=True) + (tmp_path / "top.txt").write_text("top", encoding="utf-8") + (nested / "deep.txt").write_text("deep", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path / "**" / "*.txt")) + result = op.execute(context=MagicMock()) + + texts = {doc["text"] for doc in result} + assert texts == {"top", "deep"} + def test_directory_source(self, tmp_path): (tmp_path / "x.txt").write_text("hello", encoding="utf-8") (tmp_path / "y.md").write_text("world", encoding="utf-8") @@ -257,6 +363,30 @@ def test_directory_source(self, tmp_path): assert len(result) == 2 + def test_directory_mode_skips_dotfiles(self, tmp_path): + (tmp_path / "keep.txt").write_text("keep", encoding="utf-8") + (tmp_path / ".DS_Store").write_bytes(b"\x00\x00") + (tmp_path / ".hidden.txt").write_text("nope", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path)) + result = op.execute(context=MagicMock()) + + # Only the non-dotfile is parsed; .DS_Store and .hidden.txt are ignored. + assert len(result) == 1 + assert result[0]["text"] == "keep" + + def test_directory_mode_warns_and_skips_unknown_extensions(self, tmp_path, caplog): + (tmp_path / "keep.txt").write_text("keep", encoding="utf-8") + (tmp_path / "stray.xyz").write_text("ignored", encoding="utf-8") + + op = DocumentLoaderOperator(task_id="test", source_path=str(tmp_path)) + with caplog.at_level(logging.WARNING): + result = op.execute(context=MagicMock()) + + assert len(result) == 1 + assert result[0]["text"] == "keep" + assert any(".xyz" in record.message for record in caplog.records) + def test_file_extensions_filter(self, tmp_path): (tmp_path / "keep.txt").write_text("keep me", encoding="utf-8") (tmp_path / "skip.md").write_text("skip me", encoding="utf-8") @@ -272,7 +402,7 @@ def test_empty_directory_raises_file_not_found(self, tmp_path): with pytest.raises(FileNotFoundError, match="No files found"): op.execute(context=MagicMock()) - def test_unknown_extension_raises(self, tmp_path): + def test_unknown_extension_on_single_file_raises(self, tmp_path): f = tmp_path / "data.xyz" f.write_text("some data", encoding="utf-8") @@ -296,6 +426,40 @@ def test_file_extensions_case_insensitive(self, tmp_path): assert result[0]["text"] == "keep me" +class TestEncoding: + def test_strict_utf8_default_raises_with_path_context(self, tmp_path): + f = tmp_path / "latin1.csv" + # \xff is invalid in UTF-8; surface the file path in the error. + f.write_bytes(b"\xff\xfe header\nrow\n") + + op = DocumentLoaderOperator(task_id="test", source_path=str(f)) + with pytest.raises(ValueError, match=str(f)): + op.execute(context=MagicMock()) + + def test_encoding_errors_replace_tolerates_garbage(self, tmp_path): + f = tmp_path / "mixed.txt" + f.write_bytes(b"hello \xff world") + + op = DocumentLoaderOperator( + task_id="test", + source_path=str(f), + encoding_errors="replace", + ) + result = op.execute(context=MagicMock()) + + assert "hello" in result[0]["text"] + assert "world" in result[0]["text"] + + def test_alternative_encoding_succeeds(self, tmp_path): + f = tmp_path / "latin1.txt" + f.write_bytes("café".encode("latin-1")) + + op = DocumentLoaderOperator(task_id="test", source_path=str(f), encoding="latin-1") + result = op.execute(context=MagicMock()) + + assert "café" in result[0]["text"] + + class TestOutputShape: def test_every_item_has_text_and_metadata(self, tmp_path): (tmp_path / "a.txt").write_text("doc a", encoding="utf-8") @@ -324,6 +488,21 @@ def test_metadata_fields_appended(self, tmp_path): assert result[0]["metadata"]["source"] == "test_suite" assert result[0]["metadata"]["version"] == 2 + def test_metadata_fields_do_not_override_auto_extracted(self, tmp_path): + """Auto-extracted file_name wins over a same-key entry in metadata_fields.""" + f = tmp_path / "report.txt" + f.write_text("content", encoding="utf-8") + + op = DocumentLoaderOperator( + task_id="test", + source_path=str(f), + metadata_fields={"file_name": "spoofed", "extra": "kept"}, + ) + result = op.execute(context=MagicMock()) + + assert result[0]["metadata"]["file_name"] == "report.txt" + assert result[0]["metadata"]["extra"] == "kept" + def test_file_metadata_included(self, tmp_path): f = tmp_path / "report.txt" f.write_text("content", encoding="utf-8") diff --git a/uv.lock b/uv.lock index 73e405dd33856..fa0f1ed32d7d4 100644 --- a/uv.lock +++ b/uv.lock @@ -4209,12 +4209,14 @@ bedrock = [ common-sql = [ { name = "apache-airflow-providers-common-sql" }, ] +docx = [ + { name = "python-docx" }, +] google = [ { name = "pydantic-ai-slim", extra = ["google"] }, ] langchain = [ { name = "langchain" }, - { name = "langchain-openai" }, ] mcp = [ { name = "pydantic-ai-slim", extra = ["mcp"] }, @@ -4225,6 +4227,9 @@ openai = [ parquet = [ { name = "pyarrow" }, ] +pdf = [ + { name = "pypdf" }, +] sql = [ { name = "apache-airflow-providers-common-sql" }, { name = "sqlglot" }, @@ -4238,6 +4243,7 @@ dev = [ { name = "apache-airflow-providers-common-sql", extra = ["datafusion"] }, { name = "apache-airflow-providers-standard" }, { name = "apache-airflow-task-sdk" }, + { name = "langchain" }, { name = "pydantic-ai-slim", extra = ["mcp"] }, { name = "sqlglot" }, ] @@ -4255,7 +4261,6 @@ requires-dist = [ { name = "fastavro", marker = "python_full_version >= '3.14' and extra == 'avro'", specifier = ">=1.12.1" }, { name = "fastavro", marker = "python_full_version < '3.14' and extra == 'avro'", specifier = ">=1.10.0" }, { name = "langchain", marker = "extra == 'langchain'", specifier = ">=1.0.0" }, - { name = "langchain-openai", marker = "extra == 'langchain'", specifier = ">=0.3.0" }, { name = "pyarrow", marker = "python_full_version >= '3.14' and extra == 'parquet'", specifier = ">=22.0.0" }, { name = "pyarrow", marker = "python_full_version < '3.14' and extra == 'parquet'", specifier = ">=18.0.0" }, { name = "pydantic-ai-slim", specifier = ">=1.34.0" }, @@ -4264,9 +4269,11 @@ requires-dist = [ { name = "pydantic-ai-slim", extras = ["google"], marker = "extra == 'google'" }, { name = "pydantic-ai-slim", extras = ["mcp"], marker = "extra == 'mcp'" }, { name = "pydantic-ai-slim", extras = ["openai"], marker = "extra == 'openai'" }, + { name = "pypdf", marker = "extra == 'pdf'", specifier = ">=4.0.0" }, + { name = "python-docx", marker = "extra == 'docx'", specifier = ">=1.0.0" }, { name = "sqlglot", marker = "extra == 'sql'", specifier = ">=30.0.0" }, ] -provides-extras = ["anthropic", "bedrock", "google", "openai", "mcp", "avro", "parquet", "sql", "common-sql", "langchain"] +provides-extras = ["anthropic", "bedrock", "google", "openai", "mcp", "avro", "parquet", "sql", "common-sql", "langchain", "pdf", "docx"] [package.metadata.requires-dev] dev = [ @@ -4277,6 +4284,7 @@ dev = [ { name = "apache-airflow-providers-common-sql", extras = ["datafusion"], editable = "providers/common/sql" }, { name = "apache-airflow-providers-standard", editable = "providers/standard" }, { name = "apache-airflow-task-sdk", editable = "task-sdk" }, + { name = "langchain", specifier = ">=1.0.0" }, { name = "pydantic-ai-slim", extras = ["mcp"] }, { name = "sqlglot", specifier = ">=30.0.0" }, ] @@ -14587,20 +14595,6 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/0f/1a/86c38c27b81913a1c6c12448cab55defb5a1097c7dc9a4cea83f55477a2d/langchain_core-1.4.0-py3-none-any.whl", hash = "sha256:23cbbdb46e38ddd1dd5247e6167e96013eae74bea4c5949c550809970a9e565c", size = 548120, upload-time = "2026-05-11T18:42:33.992Z" }, ] -[[package]] -name = "langchain-openai" -version = "1.2.1" -source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "langchain-core" }, - { name = "openai" }, - { name = "tiktoken" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/9a/0e/d8e16c28aa67106d285e63b8ffc04c5af68341e345ce24a0751dbf2e167e/langchain_openai-1.2.1.tar.gz", hash = "sha256:ee4480b787706361b7125fad46930589a624df87aa158c6986ef1fad10d10675", size = 1146092, upload-time = "2026-04-24T19:46:43.328Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/dc/55/2865b18ee3a3dd11160b8c4b2cf37e75bf2a4a8d1d38868ffffc7b7cc180/langchain_openai-1.2.1-py3-none-any.whl", hash = "sha256:a80732185030d4f453dda6c25feef46f645f665423fdffe38ae3edf1ac3c6c4d", size = 98626, upload-time = "2026-04-24T19:46:41.971Z" }, -] - [[package]] name = "langchain-protocol" version = "0.0.15" @@ -18862,6 +18856,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/10/bd/c038d7cc38edc1aa5bf91ab8068b63d4308c66c4c8bb3cbba7dfbc049f9c/pyparsing-3.3.2-py3-none-any.whl", hash = "sha256:850ba148bd908d7e2411587e247a1e4f0327839c40e2e5e6d05a007ecc69911d", size = 122781, upload-time = "2026-01-21T03:57:55.912Z" }, ] +[[package]] +name = "pypdf" +version = "6.11.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions", marker = "python_full_version < '3.11'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/bf/58/6dd97d78a4b17a7a6b9d1c6ad23895abc41f0fdc49c553cc05bdfdcc36d0/pypdf-6.11.0.tar.gz", hash = "sha256:062b51c81b0910e6d2755e99e1c5547a0a23b7d0a32322af66240d8edcfabe87", size = 6453975, upload-time = "2026-05-09T13:26:48.955Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/07/b1/68feb7eb3b99f0c020b414234825f4a5d70e0126c18d933770e8c93a35fc/pypdf-6.11.0-py3-none-any.whl", hash = "sha256:769394d5756d5b304c9b6bef88b54b1816b328e7e6fc9254e625529a15ed4ab8", size = 338819, upload-time = "2026-05-09T13:26:46.904Z" }, +] + [[package]] name = "pyproject-hooks" version = "1.2.0" @@ -19215,6 +19221,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/6f/a05a317a66fee0aad270011461f1a63a453ed12471249f172f7d2e2bc7b4/python_discovery-1.3.1-py3-none-any.whl", hash = "sha256:ed188687ebb3b82c01a17cd5ac62fc94d9f6487a7f1a0f9dfe89753fec91039c", size = 33185, upload-time = "2026-05-12T20:53:34.969Z" }, ] +[[package]] +name = "python-docx" +version = "1.2.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "lxml" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a9/f7/eddfe33871520adab45aaa1a71f0402a2252050c14c7e3009446c8f4701c/python_docx-1.2.0.tar.gz", hash = "sha256:7bc9d7b7d8a69c9c02ca09216118c86552704edc23bac179283f2e38f86220ce", size = 5723256, upload-time = "2025-06-16T20:46:27.921Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d0/00/1e03a4989fa5795da308cd774f05b704ace555a70f9bf9d3be057b680bcf/python_docx-1.2.0-py3-none-any.whl", hash = "sha256:3fd478f3250fbbbfd3b94fe1e985955737c145627498896a8a6bf81f4baf66c7", size = 252987, upload-time = "2025-06-16T20:46:22.506Z" }, +] + [[package]] name = "python-dotenv" version = "1.2.2" From d317ef38d60f3eae17fcd21e7651769e0202ef06 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 20 May 2026 13:16:55 +0100 Subject: [PATCH 4/5] Add cloud-storage URI support, no-chunking note, and format roadmap Addresses three follow-ups from the post-rewrite review (after #67120's initial refactor landed in 8f3aee40f0): 1. Cloud storage URIs via ObjectStoragePath - ``source_path`` now accepts any URI ObjectStoragePath resolves through fsspec (``s3://``, ``gs://``, ``azure://``, ``file://``). Falls back to the existing ``pathlib`` + ``glob`` code path for bare local paths so no existing behaviour changes. - New ``source_conn_id`` parameter to point at the Airflow connection that holds the cloud credentials (``aws_default``, ``google_cloud_default``, ...). Templated so it can be set per-DAG-run. - Parsers stay polymorphic over ``Path`` / ``ObjectStoragePath`` -- both expose ``read_bytes``, ``open``, ``name``, ``suffix`` so the existing read paths work unchanged. - Cross-directory globs in cloud URIs are explicitly not supported in this version; ``source_path`` accepts a single object or a directory. Documented. 2. Loader-not-chunker explicit - Operator docstring and new "No chunking" docs section make it clear the operator parses files into documents but never splits them. The right chunking strategy depends on the embedding model, so it stays in the downstream operator's hands (LlamaIndex EmbeddingOperator, LangChain text splitters, ...). 3. Format coverage roadmap - New docs section enumerates the formats deferred to follow-ups (.pptx, .epub, .xlsx, .html, image OCR, audio transcription), each behind its own optional extra, so reviewers and users see the scope choice explicitly rather than guessing what's missing. Tests - New ``TestCloudUriDispatch`` class covering: single-object URI returns one document, directory URI iterates children, neither-file-nor-dir URI raises with a clear error. ObjectStoragePath is mocked so the tests don't touch real cloud storage. Other ecosystems compared (LangChain BaseLoader + per-format classes; LlamaIndex BaseReader / SimpleDirectoryReader with fsspec; OpenAI / Anthropic / pydantic-ai don't have document-loader abstractions and delegate parsing to the model) -- this commit closes the remaining gap vs LlamaIndex on cloud storage and matches the LangChain naming / output-shape convention. --- .../ai/docs/operators/document_loader.rst | 63 ++++++++++++-- .../example_dags/example_document_loader.py | 20 +++++ .../common/ai/operators/document_loader.py | 83 ++++++++++++++++--- .../ai/operators/test_document_loader.py | 63 ++++++++++++++ 4 files changed, 211 insertions(+), 18 deletions(-) diff --git a/providers/common/ai/docs/operators/document_loader.rst b/providers/common/ai/docs/operators/document_loader.rst index cb238d1811035..8a836c37d120e 100644 --- a/providers/common/ai/docs/operators/document_loader.rst +++ b/providers/common/ai/docs/operators/document_loader.rst @@ -140,6 +140,34 @@ the field shape is more complex than what ``json_text_field`` covers: for r in records ] +No chunking +----------- + +The operator parses files into documents; it does **not** split them into +fixed-size chunks. The right chunking strategy depends on the embedding +model and is intentionally left to a downstream text-splitter or embedding +operator (LlamaIndex's ``EmbeddingOperator``, LangChain's text splitters, +...). + +Format coverage roadmap +----------------------- + +The current built-in dispatch covers ``.txt``, ``.md``, ``.csv``, ``.json``, +``.pdf``, ``.docx``. Additional formats are deferred to follow-ups, each +gated behind its own extra so users only install what they need: + +- ``.pptx`` via ``python-pptx`` +- ``.epub`` via ``ebooklib`` +- ``.xlsx`` via ``openpyxl`` +- ``.html`` / ``.htm`` via ``beautifulsoup4`` +- Image OCR (``.png`` / ``.jpg``) via ``pytesseract`` +- Audio transcription via a model call (``LLMOperator`` or ``AgentOperator`` + is a better fit for transcription than this parser) + +For anything not in the dispatch map, set ``parser`` explicitly (``"text"`` +to read as plain text) or write the parser inline in a ``@task`` that calls +``DocumentLoaderOperator`` with ``source_bytes`` for known formats. + Composing with downstream embedding operators --------------------------------------------- @@ -161,11 +189,27 @@ directly into embedding operators. With LlamaIndex's ``EmbeddingOperator``: load >> embed -Composing with Airflow providers --------------------------------- +Cloud storage URIs +------------------ + +``source_path`` accepts any URI that +:class:`~airflow.sdk.ObjectStoragePath` resolves via fsspec +(``s3://``, ``gs://``, ``azure://``, ``file://``, ...). Point it at a +single object or a directory; cross-directory globs in cloud URIs are not +supported in this version. + +.. exampleinclude:: /../../ai/src/airflow/providers/common/ai/example_dags/example_document_loader.py + :language: python + :start-after: [START howto_operator_document_loader_cloud_uri] + :end-before: [END howto_operator_document_loader_cloud_uri] + +Use ``source_conn_id`` to point at the Airflow connection that holds the +cloud credentials (``aws_default``, ``google_cloud_default``, ...). For +single-file URIs, ``source_conn_id`` works the same way. -Use any Airflow provider to download files, then parse them with -``DocumentLoaderOperator``: +If you'd rather download the file with a dedicated provider operator +first (e.g. to get retry semantics specific to that storage), the +download-then-parse pattern still works: .. code-block:: python @@ -214,8 +258,15 @@ Parameters * - Parameter - Description * - ``source_path`` - - Local file, directory, or glob pattern. ``**`` is recursive. Mutually - exclusive with ``source_bytes``. + - Local file, directory, or glob pattern, **or** a storage URI + (``s3://``, ``gs://``, ``azure://``, ``file://``) resolved via + :class:`~airflow.sdk.ObjectStoragePath`. ``**`` is recursive for + local globs; cross-directory globs in cloud URIs are not supported. + Mutually exclusive with ``source_bytes``. + * - ``source_conn_id`` + - Airflow connection ID for the cloud-storage credentials used by + ``ObjectStoragePath`` (``aws_default``, ``google_cloud_default``, + ...). Ignored for local paths. * - ``source_bytes`` - Raw file bytes from XCom. Requires ``file_type``. Mutually exclusive with ``source_path``. Not a template field (bytes don't survive Jinja). diff --git a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_document_loader.py b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_document_loader.py index 245b9d83ed3a3..aa80c77d32d12 100644 --- a/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_document_loader.py +++ b/providers/common/ai/src/airflow/providers/common/ai/example_dags/example_document_loader.py @@ -123,3 +123,23 @@ def example_document_loader_json_field(): # [END howto_operator_document_loader_json_field] example_document_loader_json_field() + + +# [START howto_operator_document_loader_cloud_uri] +@dag(schedule=None) +def example_document_loader_cloud_uri(): + """Read PDFs directly from S3 -- no separate download step.""" + + load_docs = DocumentLoaderOperator( + task_id="load_docs", + source_path="s3://my-bucket/reports/", + source_conn_id="aws_default", + file_extensions=[".pdf"], + ) + + load_docs + + +# [END howto_operator_document_loader_cloud_uri] + +example_document_loader_cloud_uri() diff --git a/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py b/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py index d8872b2943277..7f563d0aa4adc 100644 --- a/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py +++ b/providers/common/ai/src/airflow/providers/common/ai/operators/document_loader.py @@ -33,6 +33,14 @@ from airflow.sdk import Context +# Type alias for path-like inputs that the parsers can read from. ``Path`` is +# the local filesystem; ``ObjectStoragePath`` covers ``s3://``, ``gs://``, +# ``azure://``, ``file://``, ... via fsspec. Both expose the methods we need +# (``read_bytes``, ``open``, ``name``, ``suffix``) so the parsers stay +# polymorphic. +FilePathT = Any # Path | ObjectStoragePath + + class DocumentLoaderOperator(BaseOperator): """ Parse files into ``list[dict(text, metadata)]`` for downstream embedding. @@ -53,8 +61,19 @@ class DocumentLoaderOperator(BaseOperator): ``source_bytes``, ``file_type`` is required so the operator knows which parser to use. - :param source_path: Local file path or glob pattern (e.g. - ``/data/*.pdf``). ``**`` enables recursive matching. + The operator is intentionally a **loader**: it does not split documents + into fixed-size chunks. Pass the output to a downstream text-splitter or + embedding operator if you need chunking. + + :param source_path: A local path, glob pattern, or storage URI + (``s3://``, ``gs://``, ``azure://``, ``file://``, ...). Cloud URIs + go through :class:`~airflow.sdk.ObjectStoragePath` / fsspec. + ``**`` enables recursive matching for local globs. Cloud URIs + accept a single file or a directory; cross-directory globs in a + cloud URI are not supported in this version. + :param source_conn_id: Airflow connection ID used by + ``ObjectStoragePath`` for cloud URIs (``aws_default``, + ``google_cloud_default``, ...). Ignored for local paths. :param source_bytes: Raw file bytes, typically from XCom. :param file_type: File extension hint when using ``source_bytes`` (e.g. ``".pdf"``). Also accepted with ``source_path`` to override @@ -85,6 +104,7 @@ class DocumentLoaderOperator(BaseOperator): template_fields: Sequence[str] = ( "source_path", + "source_conn_id", "file_type", "file_extensions", "parser", @@ -104,6 +124,7 @@ def __init__( self, *, source_path: str | None = None, + source_conn_id: str | None = None, source_bytes: bytes | None = None, file_type: str | None = None, parser: str = "auto", @@ -123,6 +144,7 @@ def __init__( raise ValueError("'file_type' is required when using 'source_bytes' (e.g. '.pdf').") self.source_path = source_path + self.source_conn_id = source_conn_id self.source_bytes = source_bytes self.file_type = file_type self.parser = parser @@ -161,36 +183,73 @@ def execute(self, context: Context) -> list[dict[str, Any]]: self.log.info("Parsed %d documents from %d file(s)", len(documents), file_count) return documents - def _resolve_files(self, source_path: str) -> list[Path]: + def _resolve_files(self, source_path: str) -> list[FilePathT]: + # A storage URI (``s3://``, ``gs://``, ``file://``, ...) goes through + # ObjectStoragePath / fsspec; a bare local path keeps the existing + # glob behaviour. The heuristic is intentionally simple: presence of + # ``://`` indicates a URI. + if "://" in source_path: + return self._resolve_remote_files(source_path) + return self._resolve_local_files(source_path) + + def _resolve_local_files(self, source_path: str) -> list[Path]: path = Path(source_path) if path.is_file(): return [path] if path.is_dir(): candidates = sorted(p for p in path.iterdir() if not p.name.startswith(".")) + is_directory_mode = True else: # `recursive=True` makes `**` match across directories per the docstring. candidates = [Path(p) for p in sorted(glob.glob(source_path, recursive=True))] + is_directory_mode = False - results = [p for p in candidates if p.is_file()] + return self._filter_files([p for p in candidates if p.is_file()], is_directory_mode=is_directory_mode) + def _resolve_remote_files(self, source_path: str) -> list[FilePathT]: + from airflow.sdk import ObjectStoragePath + + root = ObjectStoragePath(source_path, conn_id=self.source_conn_id) + try: + if root.is_file(): + return [root] + except FileNotFoundError: + # Some fsspec backends raise instead of returning False. + pass + + if not root.is_dir(): + raise FileNotFoundError( + f"Cloud URI '{source_path}' is neither a file nor a directory. " + "Cross-directory globs in cloud URIs aren't supported here; " + "point ``source_path`` at a single object or a directory." + ) + + candidates = sorted( + (p for p in root.iterdir() if not p.name.startswith(".")), + key=str, + ) + return self._filter_files([p for p in candidates if p.is_file()], is_directory_mode=True) + + def _filter_files(self, results: list[FilePathT], *, is_directory_mode: bool) -> list[FilePathT]: if self.file_extensions: allowed = {(ext if ext.startswith(".") else f".{ext}").lower() for ext in self.file_extensions} - results = [p for p in results if p.suffix.lower() in allowed] - elif path.is_dir(): - # In directory mode with no explicit filter: skip files we don't - # know how to parse rather than crashing on the first .DS_Store - # or editor temp file. A glob pattern is treated as intentional. + return [p for p in results if p.suffix.lower() in allowed] + + if is_directory_mode: + # No explicit filter in directory mode: skip files we don't know + # how to parse rather than crashing on the first stray file + # (``.DS_Store``, editor swap files, etc.). A glob is treated as + # intentional and parsed via the explicit ``parser`` argument. known = set(self.EXTENSION_BACKEND_MAP.keys()) unknown = [p for p in results if p.suffix.lower() not in known] if unknown: self.log.warning( - "Skipping %d file(s) with unrecognised extension in '%s': %s", + "Skipping %d file(s) with unrecognised extension: %s", len(unknown), - source_path, ", ".join(sorted({p.suffix or "" for p in unknown})), ) - results = [p for p in results if p.suffix.lower() in known] + return [p for p in results if p.suffix.lower() in known] return results diff --git a/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py b/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py index 8740af23d0f60..9c6699ee34588 100644 --- a/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py +++ b/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py @@ -426,6 +426,69 @@ def test_file_extensions_case_insensitive(self, tmp_path): assert result[0]["text"] == "keep me" +class TestCloudUriDispatch: + """``source_path`` containing a URI scheme routes through ObjectStoragePath.""" + + @patch("airflow.sdk.ObjectStoragePath") + def test_single_object_uri_returns_one_document(self, mock_osp_cls): + mock_obj = MagicMock() + mock_obj.is_file.return_value = True + mock_obj.suffix = ".txt" + mock_obj.name = "report.txt" + mock_obj.read_bytes.return_value = b"cloud content" + mock_obj.__str__ = lambda self: "s3://bucket/dir/report.txt" + mock_osp_cls.return_value = mock_obj + + op = DocumentLoaderOperator( + task_id="test", + source_path="s3://bucket/dir/report.txt", + source_conn_id="aws_default", + ) + result = op.execute(context=MagicMock()) + + mock_osp_cls.assert_called_once_with("s3://bucket/dir/report.txt", conn_id="aws_default") + assert len(result) == 1 + assert result[0]["text"] == "cloud content" + assert result[0]["metadata"]["file_name"] == "report.txt" + + @patch("airflow.sdk.ObjectStoragePath") + def test_directory_uri_iterates_children(self, mock_osp_cls): + # Root is a directory; iterdir yields two text files. + def _mock_child(name: str, content: bytes): + child = MagicMock() + child.is_file.return_value = True + child.name = name + child.suffix = "." + name.rsplit(".", 1)[-1] + child.read_bytes.return_value = content + child.__str__ = lambda self: f"s3://bucket/dir/{name}" + return child + + a = _mock_child("a.txt", b"alpha") + b = _mock_child("b.txt", b"beta") + + root = MagicMock() + root.is_file.return_value = False + root.is_dir.return_value = True + root.iterdir.return_value = [a, b] + mock_osp_cls.return_value = root + + op = DocumentLoaderOperator(task_id="test", source_path="s3://bucket/dir/") + result = op.execute(context=MagicMock()) + + assert {doc["text"] for doc in result} == {"alpha", "beta"} + + @patch("airflow.sdk.ObjectStoragePath") + def test_neither_file_nor_dir_uri_raises(self, mock_osp_cls): + bad = MagicMock() + bad.is_file.return_value = False + bad.is_dir.return_value = False + mock_osp_cls.return_value = bad + + op = DocumentLoaderOperator(task_id="test", source_path="s3://bucket/missing") + with pytest.raises(FileNotFoundError, match="neither a file nor a directory"): + op.execute(context=MagicMock()) + + class TestEncoding: def test_strict_utf8_default_raises_with_path_context(self, tmp_path): f = tmp_path / "latin1.csv" From 79e74b336689a23b24c6e6b7f918af6cdee28498 Mon Sep 17 00:00:00 2001 From: Kaxil Naik Date: Wed, 20 May 2026 14:57:55 +0100 Subject: [PATCH 5/5] Drop test-only ``__str__`` overrides that tripped CI mypy CI's MyPy providers job flagged the `mock.__str__ = lambda ...` and `mock.__str__.return_value = ...` patterns in TestCloudUriDispatch with ``[method-assign]`` -- mypy treats `__str__` as a real method that shouldn't be reassigned at the instance level, even on a MagicMock. The tests only assert on `file_name`, the dispatched call args, and text content; they never check `metadata.file_path` (which is what `str(path)` would feed). Removing the overrides keeps the assertions intact and lets mypy pass. --- .../tests/unit/common/ai/operators/test_document_loader.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py b/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py index 9c6699ee34588..bb9db83347cf3 100644 --- a/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py +++ b/providers/common/ai/tests/unit/common/ai/operators/test_document_loader.py @@ -431,12 +431,14 @@ class TestCloudUriDispatch: @patch("airflow.sdk.ObjectStoragePath") def test_single_object_uri_returns_one_document(self, mock_osp_cls): + # `str(mock_obj)` returns whatever MagicMock renders; we only assert + # the file_name field, not file_path, so leaving __str__ default is + # fine and avoids mypy's method-assign complaint. mock_obj = MagicMock() mock_obj.is_file.return_value = True mock_obj.suffix = ".txt" mock_obj.name = "report.txt" mock_obj.read_bytes.return_value = b"cloud content" - mock_obj.__str__ = lambda self: "s3://bucket/dir/report.txt" mock_osp_cls.return_value = mock_obj op = DocumentLoaderOperator( @@ -460,7 +462,6 @@ def _mock_child(name: str, content: bytes): child.name = name child.suffix = "." + name.rsplit(".", 1)[-1] child.read_bytes.return_value = content - child.__str__ = lambda self: f"s3://bucket/dir/{name}" return child a = _mock_child("a.txt", b"alpha")