In [1]:
# Load and read PDF content
from langchain_community.document_loaders import PDFPlumberLoader
loader = PDFPlumberLoader("pdf/bgita.pdf")
docs = loader.load()
print("Number of pages in the PDF:", len(docs))

# Extract text from each page for embedding
doc_texts = [page.page_content for page in docs]
#print(" Document  texts",doc_texts)

Number of pages in the PDF: 154


In [2]:
from milvus_model.hybrid import BGEM3EmbeddingFunction
import torch
device = "mps" if torch.backends.mps.is_available() else "cpu"
ef = BGEM3EmbeddingFunction(use_fp16=False, device=device)
dense_dim = ef.dim["dense"]

# Generate embeddings using BGE-M3 model
docs_embeddings = ef(doc_texts)

  from .autonotebook import tqdm as notebook_tqdm
Fetching 30 files: 100%|██████████| 30/30 [00:00<00:00, 136474.10it/s]
You're using a XLMRobertaTokenizerFast tokenizer. Please note that with a fast tokenizer, using the `__call__` method is faster than using a method to encode the text followed by a call to the `pad` method to get a padded encoding.


In [3]:
dense_dim

1024

In [4]:
from pymilvus import (
    FieldSchema,
    utility,
    CollectionSchema,
    DataType,
    connections,
    Collection,
    AnnSearchRequest,
    WeightedRanker,
)

# Zilliz Cloud connection parameters
uri = "https://in03-4e569f605c32eab.serverless.gcp-us-west1.cloud.zilliz.com"  # e.g., "https://xxx.zillizcloud.com:443"
token ="99cd003de770782d436a049c87fb669188dc4424443531a325043d7f42859ca8c3d058b952d2e92d33677cf72b4931d12150c29d"
# Connect to Zilliz Cloud
connections.connect(
    alias="default",  # Connection alias, default is 'default'
    uri=uri,
    token=token,
    secure=True
)

# Rest of your schema definition remains the same
fields = [
    FieldSchema(
        name="pk", dtype=DataType.VARCHAR, is_primary=True, auto_id=True, max_length=100
    ),
    FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
    FieldSchema(name="sparse_vector", dtype=DataType.SPARSE_FLOAT_VECTOR),
    FieldSchema(name="dense_vector", dtype=DataType.FLOAT_VECTOR, dim=1024),
]
schema = CollectionSchema(fields)


In [5]:
# Create collection (drop the old one if exists)
col_name = "bhagavad_gita"
if utility.has_collection(col_name):
    Collection(col_name).drop()

# Create collection with the same schema but with cloud-optimized settings
col = Collection(
    name=col_name,
    schema=schema,
    consistency_level="Eventually"  # Cloud recommended consistency level
)

# Create indices for hybrid search - optimized for cloud deployment
sparse_index = {
    "index_type": "SPARSE_INVERTED_INDEX",
    "metric_type": "IP",
    "params": {}  # Cloud default parameters
}
col.create_index(
    field_name="sparse_vector",
    index_params=sparse_index
)

dense_index = {
    "index_type": "IVF_FLAT",    # Changed from AUTOINDEX for better cloud performance
    "metric_type": "IP",
    "params": {
        "nlist": 1024            # Number of clusters, adjust based on your data size
    }
}
col.create_index(
    field_name="dense_vector",
    index_params=dense_index
)

# Load collection into memory
col.load()


In [6]:
import time
from pymilvus import utility

def insert_batch(collection, texts, sparse_vectors, dense_vectors, batch_size=50):
    total_inserted = 0

    try:
        # For efficiency, we insert batch_size records at a time
        for i in range(0, len(texts), batch_size):
            batch_end = min(i + batch_size, len(texts))

            # Prepare the current batch
            batch_texts = texts[i:batch_end]
            batch_sparse = sparse_vectors[i:batch_end]
            batch_dense = dense_vectors[i:batch_end]

            # Create entities list for insertion
            entities = [
                batch_texts,      # Text content
                batch_sparse,     # Sparse vector embeddings
                batch_dense,      # Dense vector embeddings
            ]

            # Insert with retry mechanism
            max_retries = 3
            for attempt in range(max_retries):
                try:
                    insert_result = collection.insert(entities)
                    total_inserted += len(batch_texts)

                    # Ensure data is persisted
                    collection.flush()

                    print(f"Successfully inserted batch {i//batch_size + 1}, "
                          f"Records {i} to {batch_end}")
                    break

                except Exception as e:
                    if attempt == max_retries - 1:
                        print(f"Failed to insert batch after {max_retries} attempts: {e}")
                        raise
                    print(f"Retry {attempt + 1}/{max_retries} after error: {e}")
                    time.sleep(2 ** attempt)  # Exponential backoff

    except Exception as e:
        print(f"Error during insertion: {e}")
        raise

    return total_inserted

# Extract the raw text from Document objects
doc_texts = [doc.page_content for doc in docs]

# Perform the insertion
try:
    total_inserted = insert_batch(
        collection=col,
        texts=doc_texts,
        sparse_vectors=docs_embeddings["sparse"],
        dense_vectors=docs_embeddings["dense"],
        batch_size=50
    )

    # Verify insertion
    print(f"Total entities inserted: {total_inserted}")
    print(f"Collection entity count: {col.num_entities}")

    # Optional: Create an alias for the collection
    utility.create_alias(
        collection_name=col.name,
        alias="latest_docs"
    )

except Exception as e:
    print(f"Insertion process failed: {e}")


Successfully inserted batch 1, Records 0 to 50
Successfully inserted batch 2, Records 50 to 100
Successfully inserted batch 3, Records 100 to 150
Successfully inserted batch 4, Records 150 to 154
Total entities inserted: 154
Collection entity count: 154


RPC error: [create_alias], <MilvusException: (code=1602, message=latest_docs is alias to another collection: bhagavad_gita: alias already exist[database=db_4e569f605c32eab][alias=latest_docs])>, <Time:{'RPC start': '2025-02-05 14:05:29.332312', 'RPC error': '2025-02-05 14:05:29.635918'}>


Insertion process failed: <MilvusException: (code=1602, message=latest_docs is alias to another collection: bhagavad_gita: alias already exist[database=db_4e569f605c32eab][alias=latest_docs])>


In [7]:
# Enter your search query
query = "Explain Bhagavad Gita"
print(query)

# Generate embeddings for the query
query_embeddings = ef([query])
print(query_embeddings)

Explain Bhagavad Gita
{'dense': [array([-0.05035954, -0.00241854, -0.04419456, ..., -0.01265165,
        0.0194218 , -0.03085409], dtype=float32)], 'sparse': <Compressed Sparse Row sparse array of dtype 'float64'
	with 6 stored elements and shape (1, 250002)>}
