In [1]:
from __future__ import annotations

import uuid
from typing import Any, Iterator

from ragna import Rag, source_storages, core
from ragna.core import DocumentUploadParameters, Source
from ragna.deploy import Config


class DummyDocument(core.Document):
    async def get_upload_info(
        cls, *, config: Config, user: str, id: uuid.UUID, name: str
    ) -> tuple[dict[str, Any], DocumentUploadParameters]:
        raise NotImplementedError

    @classmethod
    def new(cls, name: str, **metadata) -> DummyDocument:
        return cls(
            name=name,
            metadata=metadata,
            handler=core.PlainTextDocumentHandler(),
        )

    def is_readable(self) -> bool:
        return True

    def read(self) -> bytes:
        return b"This is a test document."


class DummyAssistant(core.Assistant):
    def answer(self, prompt: str, sources: list[Source]) -> Iterator[str]:
        yield "\n".join(f"- {source.document_name}" for source in sources)


async def main(metadatas, metadata_filters):
    documents = [
        DummyDocument.new(f"document{idx}.txt", idx=idx, **metadata)
        for idx, metadata in enumerate(metadatas)
    ]
    for document in documents:
        print(f"- {document.name}: {document.metadata}")

    source_storage = source_storages.Chroma()
    source_storage.store(documents, corpus_name="dummy_corpus")

    for metadata_filter in metadata_filters:
        print("-" * 80)
        print(metadata_filter)
        print()

        chat = Rag().chat(
            input=metadata_filter, source_storage=source_storage, assistant=DummyAssistant, corpus_name="dummy_corpus"
        )
        answer = await chat.answer("?")
        print(answer)


metadatas = [
    {
        "priority": "low",
        "department": "legal",
    },
    {
        "priority": "medium",
        "department": "legal",
    },
    {
        "priority": "low",
        "department": "marketing",
    },
    {
        "priority": "high",
        "department": "marketing",
    },
    {
        "priority": "medium",
        "department": "marketing",
    },
]

metadata_filters = [
    core.MetadataFilter.eq("document_name", "document2.txt"),
    core.MetadataFilter.ge("idx", 3),
    core.MetadataFilter.eq("department", "legal"),
    core.MetadataFilter.in_("priority", ["medium", "high"]),
    core.MetadataFilter.and_(
        [
            core.MetadataFilter.eq("priority", "low"),
            core.MetadataFilter.eq("department", "legal"),
        ]
    ),
    core.MetadataFilter.and_(
        [
            core.MetadataFilter.or_(
                [
                    core.MetadataFilter.eq("priority", "medium"),
                    core.MetadataFilter.eq("department", "marketing"),
                ]
            ),
            core.MetadataFilter.lt("idx", 4),
        ]
    ),
    None
]

await main(metadatas, metadata_filters)

- document0.txt: {'idx': 0, 'priority': 'low', 'department': 'legal'}
- document1.txt: {'idx': 1, 'priority': 'medium', 'department': 'legal'}
- document2.txt: {'idx': 2, 'priority': 'low', 'department': 'marketing'}
- document3.txt: {'idx': 3, 'priority': 'high', 'department': 'marketing'}
- document4.txt: {'idx': 4, 'priority': 'medium', 'department': 'marketing'}
--------------------------------------------------------------------------------
EQ('document_name', 'document2.txt')

- document2.txt
- document2.txt
--------------------------------------------------------------------------------
GE('idx', 3)

- document4.txt
- document3.txt
- document4.txt
- document3.txt
--------------------------------------------------------------------------------
EQ('department', 'legal')

- document1.txt
- document1.txt
- document0.txt
- document0.txt
--------------------------------------------------------------------------------
IN('priority', ['medium', 'high'])

- document1.txt
- document4.txt
