<a href="https://colab.research.google.com/github/milvus-io/bootcamp/blob/master/bootcamp/tutorials/quickstart/full_text_search_with_milvus.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>   <a href="https://github.com/milvus-io/bootcamp/blob/master/bootcamp/tutorials/quickstart/full_text_search_with_milvus.ipynb" target="_blank">
    <img src="https://img.shields.io/badge/View%20on%20GitHub-555555?style=flat&logo=github&logoColor=white" alt="GitHub Repository"/>
</a>


# Hybrid Retrieval with Full-Text Search

[Full-text search](https://milvus.io/docs/full-text-search.md#Full-Text-Search) is a traditional method for retrieving documents by matching specific keywords or phrases in the text. It ranks results based on relevance scores calculated from factors like term frequency. While semantic search is better at understanding meaning and context, full-text search excels at precise keyword matching, making it a useful complement to semantic search. A common approach to constructing a Retrieval-Augmented Generation (RAG) pipeline involves retrieving documents through both semantic search and full-text search, followed by a reranking process to refine the results.

![](../../../images/advanced_rag/hybrid_and_rerank.png)

This approach converts text into sparse vectors for BM25 scoring. To ingest documents, users can simply input raw text without computing the sparse vector manually. Milvus will automatically generate and store the sparse vectors. To search documents, users just need to specify the text search query. Milvus will compute BM25 scores internally and return ranked results.


Milvus also supports hybrid retrieval by combining full-text search with dense vector based semantic search. It usually improves search quality and delivers better results to users by balancing keyword matching and semantic understanding.

> - Full-text search is currently available in Milvus Standalone, Milvus Distributed, and Zilliz Cloud, though not yet supported in Milvus Lite (which has this feature planned for future implementation). Reach out support@zilliz.com for more information.


## Preparation

### Install PyMilvus

In [1]:
! pip install pymilvus -U

> If you are using Google Colab, to enable dependencies just installed, you may need to **restart the runtime** (click on the "Runtime" menu at the top of the screen, and select "Restart session" from the dropdown menu).

### Set OpenAI API Key
We will use the models from OpenAI for creating vector embeddings and generation response. You should prepare the [api key](https://platform.openai.com/docs/quickstart) `OPENAI_API_KEY` as an environment variable.

In [8]:
import sys
import os

# Append the parent folder, not 'src'
sys.path.append(os.path.abspath(r'C:\Users\maha_\RAG_Workflow'))

from src.embedding_model.multilingual_embed import MultilingualEmbeddingModel


  from .autonotebook import tqdm as notebook_tqdm


## Setup and Configuration

Import the necessary libraries

In [9]:

import json

from pymilvus import (
    MilvusClient,
    DataType,
    Function,
    FunctionType,
    AnnSearchRequest,
    RRFRanker,
)

from src.embedding_model.multilingual_embed import MultilingualEmbeddingModel

In [10]:


class HybridRetriever:
    def __init__(self, uri, db_name, collection_name="hybrid", 
                 dense_embedding_function=None):
        self.uri = uri
        self.db_name = db_name
        self.collection_name = collection_name
        self.embedding_function = dense_embedding_function
        self.use_reranker = True
        self.use_sparse = True
        self.client = MilvusClient(uri=self.uri, db_name=self.db_name)

    def build_collection(self):
        # if isinstance(self.embedding_function.dim, dict):
        #     dense_dim = self.embedding_function.dim["dense"]
        # else:
        dense_dim = 512

        tokenizer_params = {
            "tokenizer": "standard",
            "filter": [
                "lowercase",
                {
                    "type": "length",
                    "max": 200,
                },
                {"type": "stemmer", "language": "english"},
                {
                    "type": "stop",
                    "stop_words": [
                        "a",
                        "an",
                        "and",
                        "are",
                        "as",
                        "at",
                        "be",
                        "but",
                        "by",
                        "for",
                        "if",
                        "in",
                        "into",
                        "is",
                        "it",
                        "no",
                        "not",
                        "of",
                        "on",
                        "or",
                        "such",
                        "that",
                        "the",
                        "their",
                        "then",
                        "there",
                        "these",
                        "they",
                        "this",
                        "to",
                        "was",
                        "will",
                        "with",
                    ],
                },
            ],
        }

        schema = MilvusClient.create_schema(enable_dynamic_field=True)
        schema.add_field(
            field_name="pk",
            datatype=DataType.VARCHAR,
            is_primary=True,
            auto_id=True,
            max_length=100,
        )
        schema.add_field(
            field_name="content",
            datatype=DataType.VARCHAR,
            max_length=65535,
            analyzer_params=tokenizer_params,
            enable_match=True,
            enable_analyzer=True,
        )
        schema.add_field(
            field_name="sparse_vector", datatype=DataType.SPARSE_FLOAT_VECTOR
        )
        schema.add_field(
            field_name="dense_vector", datatype=DataType.FLOAT_VECTOR, dim=dense_dim
        )
        schema.add_field(
            field_name="original_uuid", datatype=DataType.VARCHAR, max_length=128
        )
        schema.add_field(field_name="doc_id", datatype=DataType.VARCHAR, max_length=64)
        schema.add_field(
            field_name="chunk_id", datatype=DataType.VARCHAR, max_length=64
        ),
        schema.add_field(field_name="original_index", datatype=DataType.INT32)

        functions = Function(
            name="bm25",
            function_type=FunctionType.BM25,
            input_field_names=["content"],
            output_field_names=["sparse_vector"],
        )

        schema.add_function(functions)

        index_params = MilvusClient.prepare_index_params()
        index_params.add_index(
            field_name="sparse_vector",
            index_type="SPARSE_INVERTED_INDEX",
            metric_type="BM25",
        )
        index_params.add_index(
            field_name="dense_vector", index_type="AUTOINDEX", metric_type="COSINE"
        )

        self.client.create_collection(
            collection_name=self.collection_name,
            schema=schema,
            index_params=index_params,
        )

    def insert_data(self, chunk, metadata):
        embedding = self.embedding_function.embed([chunk])[0]
        if isinstance(embedding, dict) and "dense" in embedding:
            dense_vec = embedding["dense"][0]
        else:
            dense_vec = embedding
        self.client.insert(
            self.collection_name, {"dense_vector": dense_vec, **metadata}
        )

    def search(self, query: str, k: int = 20, mode="hybrid"):

        output_fields = [
            "content",
            "original_uuid",
            "doc_id",
            "chunk_id",
            "original_index",
        ]
        if mode in ["dense", "hybrid"]:
            embedding = self.embedding_function.embed([query])
            if isinstance(embedding, dict) and "dense" in embedding:
                dense_vec = embedding["dense"][0]
            else:
                dense_vec = embedding[0]

        if mode == "sparse":
            results = self.client.search(
                collection_name=self.collection_name,
                data=[query],
                anns_field="sparse_vector",
                limit=k,
                output_fields=output_fields,
            )
        elif mode == "dense":
            results = self.client.search(
                collection_name=self.collection_name,
                data=[dense_vec],
                anns_field="dense_vector",
                limit=k,
                output_fields=output_fields,
            )
        elif mode == "hybrid":
            full_text_search_params = {"metric_type": "BM25"}
            full_text_search_req = AnnSearchRequest(
                [query], "sparse_vector", full_text_search_params, limit=k
            )

            dense_search_params = {"metric_type": "COSINE"}
            dense_req = AnnSearchRequest(
                [dense_vec], "dense_vector", dense_search_params, limit=k
            )

            results = self.client.hybrid_search(
                self.collection_name,
                [full_text_search_req, dense_req],
                ranker=RRFRanker(),
                limit=k,
                output_fields=output_fields,
            )
        else:
            raise ValueError("Invalid mode")
        return [
            {
                "doc_id": doc["entity"]["doc_id"],
                "chunk_id": doc["entity"]["chunk_id"],
                "content": doc["entity"]["content"],
                "score": doc["distance"],
            }
            for doc in results[0]
        ]

In [11]:
model = MultilingualEmbeddingModel()
model.load_model()
standard_retriever = HybridRetriever(
    uri="http://localhost:19530",
    db_name="default",
    collection_name="milvus_hybrid",
    dense_embedding_function=model)

We'll use the MilvusClient to establish a connection to the Milvus server.

## Data Ingestion

In [12]:
from tqdm import tqdm
import json

path = "codebase_chunks.json"
with open(path, "r") as f:
    dataset = json.load(f)

is_insert = True
if is_insert:
    standard_retriever.build_collection()
    
    # Wrap the dataset with tqdm for progress tracking
    for doc in tqdm(dataset, desc="Processing Documents", unit="doc"):
        doc_content = doc["content"]
        
        # Wrap the chunks of each document with tqdm
        for chunk in tqdm(doc["chunks"], desc="Processing Chunks", unit="chunk", leave=False):
            metadata = {
                "doc_id": doc["doc_id"],
                "original_uuid": doc["original_uuid"],
                "chunk_id": chunk["chunk_id"],
                "original_index": chunk["original_index"],
                "content": chunk["content"],
            }
            chunk_content = chunk["content"]
            
            # Insert data with progress tracking
            standard_retriever.insert_data(chunk_content[:500], metadata)

Processing Documents: 100%|██████████| 90/90 [01:30<00:00,  1.01s/doc]


In [None]:
results = standard_retriever.search("What is the purpose of the DiffExecutor struct?", mode="hybrid", k=3)


[{'doc_id': 'doc_1', 'chunk_id': 'doc_1_chunk_0', 'content': '//! Executor for differential fuzzing.\n//! It wraps two executors that will be run after each other with the same input.\n//! In comparison to the [`crate::executors::CombinedExecutor`] it also runs the secondary executor in `run_target`.\n//!\nuse core::{cell::UnsafeCell, fmt::Debug, ptr};\n\nuse libafl_bolts::{ownedref::OwnedMutPtr, tuples::MatchName};\nuse serde::{Deserialize, Serialize};\n\nuse crate::{\n    executors::{Executor, ExitKind, HasObservers},\n    inputs::UsesInput,\n    observers::{DifferentialObserversTuple, ObserversTuple, UsesObservers},\n    state::UsesState,\n    Error,\n};\n\n/// A [`DiffExecutor`] wraps a primary executor, forwarding its methods, and a secondary one\n#[derive(Debug)]\npub struct DiffExecutor<A, B, OTA, OTB, DOT> {\n    primary: A,\n    secondary: B,\n    observers: UnsafeCell<ProxyObserversTuple<OTA, OTB, DOT>>,\n}\n\n', 'score': 0.016393441706895828}, {'doc_id': 'doc_63', 'chunk_id'

In [None]:
print(results)

Note: you may need to restart the kernel to use updated packages.



[notice] A new release of pip is available: 24.0 -> 25.1.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [None]:
pip install openai


In [3]:
import openai

class OpenAILLM:
    def __init__(self, api_key: str, model: str = "gpt-4"):
        self.api_key = api_key
        self.model = model
        openai.api_key = self.api_key

    def generate(self, prompt: str, temperature: float = 0.7, max_tokens: int = 512) -> str:
        response = openai.ChatCompletion.create(
            model=self.model,
            messages=[
                {"role": "user", "content": prompt}
            ],
            temperature=temperature,
            max_tokens=max_tokens
        )
        return response['choices'][0]['message']['content'].strip()

# Example usage:
# llm = OpenAILLM(api_key="your-openai-api-key")
# print(llm.generate("Explain quantum computing in simple terms."))


In [4]:
def build_prompt_with_docs(retrieved_docs: list[str], user_query: str) -> str:
    """
    Build a prompt for the LLM using retrieved documents and a user query.

    Args:
        retrieved_docs (list[str]): List of strings representing the retrieved documents.
        user_query (str): The user's original question.

    Returns:
        str: A formatted prompt to send to the LLM.
    """
    context = "\n\n".join(f"Document {i+1}:\n{doc}" for i, doc in enumerate(retrieved_docs))
    prompt = (
        f"You are an AI assistant helping a user based on the following retrieved documents:\n\n"
        f"{context}\n\n"
        f"Answer the following question using only the information in the documents:\n"
        f"\"{user_query}\"\n\n"
        f"If the answer cannot be found in the documents, say \"I don't know based on the provided information.\""
    )
    return prompt


In [None]:

retrieved_docs = [
    "Milvus is a vector database designed for high-performance similarity search.",
    "It supports scalar filtering, indexing, and full-text search via BM25."
]

user_query = "How does Milvus support full-text search?"


prompt = build_prompt_with_docs(retrieved_docs, user_query)


llm = OpenAILLM(api_key="your-api-key")
answer = llm.generate(prompt)
print(answer)


APIRemovedInV1: 

You tried to access openai.ChatCompletion, but this is no longer supported in openai>=1.0.0 - see the README at https://github.com/openai/openai-python for the API.

You can run `openai migrate` to automatically upgrade your codebase to use the 1.0.0 interface. 

Alternatively, you can pin your installation to the old version, e.g. `pip install openai==0.28`

A detailed migration guide is available here: https://github.com/openai/openai-python/discussions/742
