In [1]:
!which pip

/home/abhi_ubuntu/projects/venv/bin/pip


In [2]:
!which python

/home/abhi_ubuntu/projects/venv/bin/python


In [3]:
from typing import Iterable, Dict, Any, List, Generator
from pymilvus import MilvusClient

def _norm_etag(x: str) -> str:
    return str(x).strip('"').strip("'") if x is not None else x

def _batched(seq: List[str], n: int) -> Iterable[List[str]]:
    for i in range(0, len(seq), n):
        yield seq[i:i+n]

def _query_existing_etags(client: MilvusClient, collection: str, etags: List[str], etag_field="etag", batch_size=500) -> set:
    """Check existence in Milvus in IN-batches; returns a set of found etags."""
    found = set()
    for batch in _batched(etags, batch_size):
        in_list = ",".join([f'"{e}"' for e in batch])
        filt = f'{etag_field} in [{in_list}]'
        rows = client.query(
            collection_name=collection,
            filter=filt,
            output_fields=[etag_field],
            limit=len(batch)  # enough to capture all matches
        )
        for r in rows:
            val = r.get(etag_field)
            if val is not None:
                found.add(_norm_etag(val))
    return found

def iter_new_objects_by_etag(
    s3_client,
    bucket: str,
    prefix: str,
    milvus_client: MilvusClient,
    collection_name: str,
    etag_field: str = "etag",
    page_batch_check: int = 750,   # size for IN filter per page
) -> Generator[Dict[str, Any], None, None]:
    """
    Stream pages from S3/OCI and yield only objects whose ETag is NOT already in Milvus.
    Memory-friendly: handles a single page at a time.
    """
    paginator = s3_client.get_paginator("list_objects_v2")
    pages = paginator.paginate(Bucket=bucket, Prefix=prefix or "")

    # ensure collection is loaded (no-op if already loaded)
    milvus_client.load_collection(collection_name)

    for page in pages:
        contents = page.get("Contents", []) or []
        if not contents:
            continue

        # Build a dedup map for the current page only
        etag_to_objs: Dict[str, List[Dict[str, Any]]] = {}
        for obj in contents:
            etg = _norm_etag(obj.get("ETag"))
            if not etg:
                continue
            etag_to_objs.setdefault(etg, []).append(obj)

        unique_etags = list(etag_to_objs.keys())

        # For large pages, check existence in IN-batches to keep filter size bounded
        existing = set()
        for sub in _batched(unique_etags, page_batch_check):
            existing |= _query_existing_etags(
                milvus_client, collection_name, sub, etag_field=etag_field, batch_size=page_batch_check
            )

        # Yield only new objects from this page
        for etg, objs in etag_to_objs.items():
            if etg not in existing:
                for obj in objs:
                    yield obj  # streaming: caller can process/insert immediately


  from pkg_resources import DistributionNotFound, get_distribution


In [None]:
client = MilvusClient("http://YOUR_MILVUS_HOST:19530")

new_objs_iter = iter_new_objects_by_etag(
    s3_client=s3_client,
    bucket="my-bucket",
    prefix="optional/prefix/",
    milvus_client=client,
    collection_name="docs",     # has dynamic field "etag"
    etag_field="etag",
    page_batch_check=750
)

# Stream & process (no big lists in memory)
for obj in new_objs_iter:
    key  = obj["Key"]
    etag = obj["ETag"].strip('"')
    # -> download/extract/chunk/embed
    # -> insert into Milvus with {"vector": ..., "etag": etag, "doc_id": key, ...}
