From 8572a6dbc0e5e58923f63fc2e619dd415f18dd1e Mon Sep 17 00:00:00 2001 From: Quang Hoa <84486392+hoanq1811@users.noreply.github.com> Date: Sat, 10 Feb 2024 06:21:31 +0700 Subject: [PATCH] community[patch]: Make some functions work with Milvus (#10695) **Description** Make some functions work with Milvus: 1. get_ids: Get primary keys by field in the metadata 2. delete: Delete one or more entities by ids 3. upsert: Update/Insert one or more entities **Issue** None **Dependencies** None **Tag maintainer:** @hwchase17 **Twitter handle:** None --------- Co-authored-by: HoaNQ9 Co-authored-by: Erick Friis --- .../integrations/vectorstores/milvus.ipynb | 121 ++++++++++++------ .../vectorstores/milvus.py | 61 +++++++++ .../vectorstores/test_milvus.py | 42 +++++- 3 files changed, 187 insertions(+), 37 deletions(-) diff --git a/docs/docs/integrations/vectorstores/milvus.ipynb b/docs/docs/integrations/vectorstores/milvus.ipynb index 79d0df5c3de5280..d9f78e033ca26ac 100644 --- a/docs/docs/integrations/vectorstores/milvus.ipynb +++ b/docs/docs/integrations/vectorstores/milvus.ipynb @@ -204,23 +204,29 @@ }, { "cell_type": "markdown", + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%% md\n" + } + }, "source": [ "### Per-User Retrieval\n", "\n", "When building a retrieval app, you often have to build it with multiple users in mind. This means that you may be storing data not just for one user, but for many different users, and they should not be able to see eachother’s data.\n", "\n", "Milvus recommends using [partition_key](https://milvus.io/docs/multi_tenancy.md#Partition-key-based-multi-tenancy) to implement multi-tenancy, here is an example." - ], - "metadata": { - "collapsed": false, - "pycharm": { - "name": "#%% md\n" - } - } + ] }, { "cell_type": "code", "execution_count": 2, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, "outputs": [], "source": [ "from langchain_core.documents import Document\n", @@ -236,16 +242,16 @@ " drop_old=True,\n", " partition_key_field=\"namespace\", # Use the \"namespace\" field as the partition key\n", ")" - ], + ] + }, + { + "cell_type": "markdown", "metadata": { "collapsed": false, "pycharm": { - "name": "#%%\n" + "name": "#%% md\n" } - } - }, - { - "cell_type": "markdown", + }, "source": [ "To conduct a search using the partition key, you should include either of the following in the boolean expression of the search request:\n", "\n", @@ -256,21 +262,23 @@ "Do replace `` with the name of the field that is designated as the partition key.\n", "\n", "Milvus changes to a partition based on the specified partition key, filters entities according to the partition key, and searches among the filtered entities.\n" - ], - "metadata": { - "collapsed": false, - "pycharm": { - "name": "#%% md\n" - } - } + ] }, { "cell_type": "code", "execution_count": 3, + "metadata": { + "collapsed": false, + "pycharm": { + "name": "#%%\n" + } + }, "outputs": [ { "data": { - "text/plain": "[Document(page_content='i worked at facebook', metadata={'namespace': 'ankush'})]" + "text/plain": [ + "[Document(page_content='i worked at facebook', metadata={'namespace': 'ankush'})]" + ] }, "execution_count": 3, "metadata": {}, @@ -282,21 +290,23 @@ "vectorstore.as_retriever(\n", " search_kwargs={\"expr\": 'namespace == \"ankush\"'}\n", ").get_relevant_documents(\"where did i work?\")" - ], + ] + }, + { + "cell_type": "code", + "execution_count": 4, "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } - } - }, - { - "cell_type": "code", - "execution_count": 4, + }, "outputs": [ { "data": { - "text/plain": "[Document(page_content='i worked at kensho', metadata={'namespace': 'harrison'})]" + "text/plain": [ + "[Document(page_content='i worked at kensho', metadata={'namespace': 'harrison'})]" + ] }, "execution_count": 4, "metadata": {}, @@ -308,13 +318,52 @@ "vectorstore.as_retriever(\n", " search_kwargs={\"expr\": 'namespace == \"harrison\"'}\n", ").get_relevant_documents(\"where did i work?\")" - ], - "metadata": { - "collapsed": false, - "pycharm": { - "name": "#%%\n" - } - } + ] + }, + { + "cell_type": "markdown", + "id": "89756e9e", + "metadata": {}, + "source": [ + "**To delete or upsert (update/insert) one or more entities:**" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "21c4edcf", + "metadata": {}, + "outputs": [], + "source": [ + "from langchain.docstore.document import Document\n", + "\n", + "# Insert data sample\n", + "docs = [\n", + " Document(page_content=\"foo\", metadata={\"id\": 1}),\n", + " Document(page_content=\"bar\", metadata={\"id\": 2}),\n", + " Document(page_content=\"baz\", metadata={\"id\": 3}),\n", + "]\n", + "vector_db = Milvus.from_documents(\n", + " docs,\n", + " embeddings,\n", + " connection_args={\"host\": \"127.0.0.1\", \"port\": \"19530\"},\n", + ")\n", + "\n", + "# Search pks (primary keys) using expression\n", + "expr = \"id in [1,2]\"\n", + "pks = vector_db.get_pks(expr)\n", + "\n", + "# Delete entities by pks\n", + "result = vector_db.delete(pks)\n", + "\n", + "# Upsert (Update/Insert)\n", + "new_docs = [\n", + " Document(page_content=\"new_foo\", metadata={\"id\": 1}),\n", + " Document(page_content=\"new_bar\", metadata={\"id\": 2}),\n", + " Document(page_content=\"upserted_bak\", metadata={\"id\": 3}),\n", + "]\n", + "upserted_pks = vector_db.upsert(pks, new_docs)" + ] } ], "metadata": { @@ -338,4 +387,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +} diff --git a/libs/community/langchain_community/vectorstores/milvus.py b/libs/community/langchain_community/vectorstores/milvus.py index a09c96c4a48f1f5..365ada0ce6a4904 100644 --- a/libs/community/langchain_community/vectorstores/milvus.py +++ b/libs/community/langchain_community/vectorstores/milvus.py @@ -989,3 +989,64 @@ def _parse_document(self, data: dict) -> Document: page_content=data.pop(self._text_field), metadata=data.pop(self._metadata_field) if self._metadata_field else data, ) + + def get_pks(self, expr: str, **kwargs: Any) -> List[int] | None: + """Get primary keys with expression + + Args: + expr: Expression - E.g: "id in [1, 2]", or "title LIKE 'Abc%'" + + Returns: + List[int]: List of IDs (Primary Keys) + """ + + from pymilvus import MilvusException + + if self.col is None: + logger.debug("No existing collection to get pk.") + return None + + try: + query_result = self.col.query( + expr=expr, output_fields=[self._primary_field] + ) + except MilvusException as exc: + logger.error("Failed to get ids: %s error: %s", self.collection_name, exc) + raise exc + pks = [item.get(self._primary_field) for item in query_result] + return pks + + def upsert( + self, + ids: Optional[List[str]] = None, + documents: List[Document] | None = None, + **kwargs: Any, + ) -> List[str] | None: + """Update/Insert documents to the vectorstore. + + Args: + ids: IDs to update - Let's call get_pks to get ids with expression \n + documents (List[Document]): Documents to add to the vectorstore. + + Returns: + List[str]: IDs of the added texts. + """ + + from pymilvus import MilvusException + + if documents is None or len(documents) == 0: + logger.debug("No documents to upsert.") + return None + + if ids is not None and len(ids): + try: + self.delete(ids=ids) + except MilvusException: + pass + try: + return self.add_documents(documents=documents) + except MilvusException as exc: + logger.error( + "Failed to upsert entities: %s error: %s", self.collection_name, exc + ) + raise exc diff --git a/libs/community/tests/integration_tests/vectorstores/test_milvus.py b/libs/community/tests/integration_tests/vectorstores/test_milvus.py index 807edcdb6e420a6..b214349f9720fd2 100644 --- a/libs/community/tests/integration_tests/vectorstores/test_milvus.py +++ b/libs/community/tests/integration_tests/vectorstores/test_milvus.py @@ -1,5 +1,5 @@ """Test Milvus functionality.""" -from typing import List, Optional +from typing import Any, List, Optional from langchain_core.documents import Document @@ -25,6 +25,10 @@ def _milvus_from_texts( ) +def _get_pks(expr: str, docsearch: Milvus) -> List[Any]: + return docsearch.get_pks(expr) + + def test_milvus() -> None: """Test end to end construction and search.""" docsearch = _milvus_from_texts() @@ -109,6 +113,42 @@ def test_milvus_no_drop() -> None: assert len(output) == 6 +def test_milvus_get_pks() -> None: + """Test end to end construction and get pks with expr""" + texts = ["foo", "bar", "baz"] + metadatas = [{"id": i} for i in range(len(texts))] + docsearch = _milvus_from_texts(metadatas=metadatas) + expr = "id in [1,2]" + output = _get_pks(expr, docsearch) + assert len(output) == 2 + + +def test_milvus_delete_entities() -> None: + """Test end to end construction and delete entities""" + texts = ["foo", "bar", "baz"] + metadatas = [{"id": i} for i in range(len(texts))] + docsearch = _milvus_from_texts(metadatas=metadatas) + expr = "id in [1,2]" + pks = _get_pks(expr, docsearch) + result = docsearch.delete(pks) + assert result is True + + +def test_milvus_upsert_entities() -> None: + """Test end to end construction and upsert entities""" + texts = ["foo", "bar", "baz"] + metadatas = [{"id": i} for i in range(len(texts))] + docsearch = _milvus_from_texts(metadatas=metadatas) + expr = "id in [1,2]" + pks = _get_pks(expr, docsearch) + documents = [ + Document(page_content="test_1", metadata={"id": 1}), + Document(page_content="test_2", metadata={"id": 3}), + ] + ids = docsearch.upsert(pks, documents) + assert len(ids) == 2 + + # if __name__ == "__main__": # test_milvus() # test_milvus_with_metadata()