In [None]:
import pandas as pd
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Sequence, cast


In [None]:
data = pd.read_parquet('/home/isma/repos/book/data-pipelines-with-airflow-2nd-ed/chapter13_genai/recipe_book/notebooks/splitted_with_vectors.parquet')
data

In [None]:
uuid_column = "chunk_sha"

unique_columns = sorted(data.columns.to_list())

data = data.drop_duplicates(subset=[uuid_column], keep="first")



In [None]:
def _prepare_document_to_uuid_map(
    data: Sequence[Mapping], group_key: str, get_value
) -> dict[str, set]:
    """Prepare the map of grouped_key to set."""
    grouped_key_to_set: dict = {}
    for item in data:
        document_url = item[group_key]

        if document_url not in grouped_key_to_set:
            grouped_key_to_set[document_url] = set()

        grouped_key_to_set[document_url].add(get_value(item))
    return grouped_key_to_set


input_documents_to_uuid = _prepare_document_to_uuid_map(
    data=data.to_dict("records"),
    group_key="chunk",
    get_value=lambda x: x["chunk_sha"],
)

input_documents_to_uuid



In [None]:
documents_to_uuid: dict = {}
document_keys = set(data[document_column])
while True:
    collection = self.get_collection(collection_name)
    data_objects = collection.query.fetch_objects(
        filters=Filter.any_of(
            [Filter.by_property(document_column).equal(key) for key in document_keys]
        ),
        return_properties=[document_column],
        limit=limit,
        offset=offset,
    )
    if len(data_objects.objects) == 0:
        break
    offset = offset + limit

    if uuid_column in data_objects.objects[0].properties:
        data_object_properties = [obj.properties for obj in data_objects.objects]
    else:
        data_object_properties = []
        for obj in data_objects.objects:
            row = dict(obj.properties)
            row[uuid_column] = str(obj.uuid)
            data_object_properties.append(row)

    documents_to_uuid.update(
        self._prepare_document_to_uuid_map(
            data=data_object_properties,
            group_key=document_column,
            get_value=lambda x: x[uuid_column],
        )
    )


In [None]:
\
changed_documents = set()
unchanged_docs = set()
new_documents = set()


# segregate documents into changed, unchanged and non-existing documents.
for doc_url, doc_set in input_documents_to_uuid.items():
    if doc_url in existing_documents_to_uuid
        if existing_documents_to_uuid[doc_url] != doc_set:
            changed_documents.add(str(doc_url))
        else:
            unchanged_docs.add(str(doc_url))
    else:
        new_documents.add(str(doc_url))

In [None]:

    (
        documents_to_uuid_map,
        changed_documents,
        unchanged_documents,
        new_documents,
    ) = self._get_segregated_documents(
        data=data,
        document_column=document_column,
        uuid_column=uuid_column,
        collection_name=collection_name,
    )
    if verbose:
        self.log.info(
            "Found %s changed documents, %s unchanged documents and %s non-existing documents",
            len(changed_documents),
            len(unchanged_documents),
            len(new_documents),
        )
        for document in changed_documents:
            self.log.info(
                "Changed document: %s has %s objects.", document, len(documents_to_uuid_map[document])
            )
        self.log.info("Non-existing document: %s", ", ".join(new_documents))

    if existing == "error" and len(changed_documents):
        raise ValueError(
            f"Documents {', '.join(changed_documents)} already exists. You can either skip or replace"
            f" them by passing 'existing=skip' or 'existing=replace' respectively."
        )
    elif existing == "skip":
        data = data[data[document_column].isin(new_documents)]
        if verbose:
            self.log.info(
                "Since existing=skip, ingesting only non-existing document's object %s", data.shape[0]
            )
    elif existing == "replace":
        total_objects_count = sum([len(documents_to_uuid_map[doc]) for doc in changed_documents])
        if verbose:
            self.log.info(
                "Since existing='replace', deleting %s objects belonging changed documents %s",
                total_objects_count,
                changed_documents,
            )
        if list(changed_documents):
            batch_delete_error = self._delete_all_documents_objects(
                document_keys=list(changed_documents),
                document_column=document_column,
                collection_name=collection_name,
                total_objects_count=total_objects_count,
                batch_delete_error=batch_delete_error,
                verbose=verbose,
            )
        data = data[data[document_column].isin(new_documents.union(changed_documents))]
        self.log.info("Batch inserting %s objects for non-existing and changed documents.", data.shape[0])

    if data.shape[0]:
        self.batch_data(
            collection_name=collection_name,
            data=data,
            vector_col=vector_column,
            uuid_col=uuid_column,
        )
        if batch_delete_error:
            if batch_delete_error:
                self.log.info("Failed to delete %s objects.", len(batch_delete_error))
            # Rollback object that were not created properly
            self._delete_objects(
                [item["uuid"] for item in batch_delete_error],
                collection_name=collection_name,
            )

    if verbose:
        collection = self.get_collection(collection_name)
        self.log.info(
            "Total objects in collection %s : %s ",
            collection_name,
            collection.aggregate.over_all(total_count=True),
        )