### 线上RAG应用pdf文档频繁更新

用户将他们的PDF文档存储在磁盘的某个特定目录中，然后有一个定时任务来扫描此目录并从中的PDF文档构建知识库。

对于增量更新，做hash指纹这一点毋庸置疑，但是hash的对象不能是文件了，而应该聚焦于真实存到知识库的数据: document.

LangChain索引使用记录管理器(RecordManager)来跟踪写入矢量存储的文档。
当索引内容时，为每个文档计算哈希值，并将以下信息存储在记录管理器中:
文档hash(页面内容和元数据的散列)
写时间
源id——每个文档应该在其元数据中包含信息，以便我们确定该文档的最终来源

In [None]:
from langchain.embeddings import OpenAIEmbeddings
from langchain.schema import Document
from langchain.vectorstores.elasticsearch import ElasticsearchStore
from langchain.indexes import SQLRecordManager, index


collection_name = "test_index"

embedding = OpenAIEmbeddings()

vectorstore = ElasticsearchStore(
    es_url="http://localhost:9200",
    index_name="test_index",
    embedding=embedding)

namespace = f"elasticsearch/{collection_name}"
record_manager = SQLRecordManager(
    namespace, db_url="sqlite:///record_manager_cache.sql"
)
# record_manager.create_schema()

doc1 = Document(page_content="kitty", metadata={"source": "kitty.txt"})
doc2 = Document(page_content="doggy", metadata={"source": "doggy.txt"})
doc3 = Document(page_content="doggy1", metadata={"source": "doggy.txt"})

def _clear():
    """Hacky helper method to clear content. See the `full` mode section to to understand why it works."""
    index(
        [],
        record_manager,
        vectorstore,
        cleanup=None,
        source_id_key="source")

_clear()

res = index(
    [doc1, doc1, doc2],
    record_manager,
    vectorstore,
    cleanup=None,
    source_id_key="source",
)
print(res)
# {'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}

res = index(
    [doc1, doc3],
    record_manager,
    vectorstore,
    cleanup=None,
    source_id_key="source",
)
print(res)
# {'num_added': 1, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 0}

In [None]:
res = index(
    [doc1, doc1, doc2],
    record_manager,
    vectorstore,
    cleanup="full",
    source_id_key="source",
)
print(res)
# {'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}

res = index(
    [doc1, doc3],
    record_manager,
    vectorstore,
    cleanup="full",
    source_id_key="source",
)
print(res)
# {'num_added': 1, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 1}



In [None]:
# incremental "增量模式"，最常用的一种模式
_clear()

res = index(
    [doc1, doc1, doc2],
    record_manager,
    vectorstore,
    cleanup="incremental",
    source_id_key="source",
)
print(res)
# {'num_added': 2, 'num_updated': 0, 'num_skipped': 0, 'num_deleted': 0}

res = index(
    [doc1, doc3],
    record_manager,
    vectorstore,
    cleanup="incremental",
    source_id_key="source",
)
print(res)
# {'num_added': 1, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 1}

res = index(
    [doc1],
    record_manager,
    vectorstore,
    cleanup="incremental",
    source_id_key="source",
)
print(res)
# {'num_added': 0, 'num_updated': 0, 'num_skipped': 1, 'num_deleted': 0}

In [None]:
def index(
    docs_source: Union[BaseLoader, Iterable[Document]],
    record_manager: RecordManager,
    vector_store: VectorStore,
    *,
    batch_size: int = 100,
    cleanup: Literal["incremental", "full", None] = None,
    source_id_key: Union[str, Callable[[Document], str], None] = None,
    cleanup_batch_size: int = 1_000,
):
    ...

    if isinstance(docs_source, BaseLoader):
        try:
            doc_iterator = docs_source.lazy_load()
        except NotImplementedError:
            doc_iterator = iter(docs_source.load())
    else:
        doc_iterator = iter(docs_source)

    source_id_assigner = _get_source_id_assigner(source_id_key)

    # Mark when the update started.
    index_start_dt = record_manager.get_time()
    num_added = 0
    num_skipped = 0
    num_updated = 0
    num_deleted = 0
     
     
    for doc_batch in _batch(batch_size, doc_iterator):
        hashed_docs = list(
            _deduplicate_in_order(
                [_HashedDocument.from_document(doc) for doc in doc_batch]
            )
        )

        source_ids: Sequence[Optional[str]] = [
            source_id_assigner(doc) for doc in hashed_docs
        ]

        ...

        exists_batch = record_manager.exists([doc.uid for doc in hashed_docs])

        # Filter out documents that already exist in the record store.
        uids = []
        docs_to_index = []
         
        # 判断哪些是要更新，哪些是要添加的
        for hashed_doc, doc_exists in zip(hashed_docs, exists_batch):
            if doc_exists:
                # Must be updated to refresh timestamp.
                record_manager.update([hashed_doc.uid], time_at_least=index_start_dt)
                num_skipped += 1
                continue
            uids.append(hashed_doc.uid)
            docs_to_index.append(hashed_doc.to_document())

        # 知识入向量库
        if docs_to_index:
            vector_store.add_documents(docs_to_index, ids=uids)
            num_added += len(docs_to_index)

        # 更新数据库记录时间
        record_manager.update(
            [doc.uid for doc in hashed_docs],
            group_ids=source_ids,
            time_at_least=index_start_dt,
        )

        # 根据时间和source_ids 清理旧版本数据
        if cleanup == "incremental":
            ...

            uids_to_delete = record_manager.list_keys(
                group_ids=source_ids, before=index_start_dt
            )
            if uids_to_delete:
                vector_store.delete(uids_to_delete)
                record_manager.delete_keys(uids_to_delete)
                num_deleted += len(uids_to_delete)

    if cleanup == "full":
        while uids_to_delete := record_manager.list_keys(
            before=index_start_dt, limit=cleanup_batch_size
        ):
            # First delete from record store.
            vector_store.delete(uids_to_delete)
            # Then delete from record manager.
            record_manager.delete_keys(uids_to_delete)
            num_deleted += len(uidids_to_delete)

    return {
        "num_added": num_added,
        "num_updated": num_updated,
        "num_skipped": num_skipped,
        "num_deleted": num_deleted,
    }