In [34]:
# connect to milvus db

from langchain_milvus import Milvus
from langchain_google_genai import GoogleGenerativeAIEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType, utility
import os 
from dotenv import load_dotenv

load_dotenv()
MILVUS_URI = "https://in03-d8f19805795ae7b.serverless.aws-eu-central-1.cloud.zilliz.com"
MILVUS_TOKEN = "e548ca0b2cdf6d09cbe43de608c2a80a52d001d36248cdbc3aab61e9ba2ff49eb57fe576b2c7d792fab406f1cb35470b860abf96"
MILVUS_COLLECTION_NAME = "translations"

# Connect to Milvus
connections.connect(
    uri=MILVUS_URI,
    token=MILVUS_TOKEN
)

# Get embedding dimension (Google embedding-001 uses 768 dimensions)
embedding_model = GoogleGenerativeAIEmbeddings(model="models/embedding-001", google_api_key=os.getenv("GEMINI_API_KEY"))
EMBEDDING_DIM = 768  # Google embedding-001 dimension

# Define custom schema with all required fields
# This prevents schema resets by explicitly defining the structure
id_field = FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True)
source_text_field = FieldSchema(name="source_text", dtype=DataType.VARCHAR, max_length=10000)
source_language_field = FieldSchema(name="source_language", dtype=DataType.VARCHAR, max_length=50)
translation_text_field = FieldSchema(name="translation_text", dtype=DataType.VARCHAR, max_length=10000)
translation_language_field = FieldSchema(name="translation_language", dtype=DataType.VARCHAR, max_length=50)
# Vector field for translation_text and source embeddings
source_vector_field = FieldSchema(name="source_vector", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIM)
translation_vector_field = FieldSchema(name="translation_vector", dtype=DataType.FLOAT_VECTOR, dim=EMBEDDING_DIM)

# Create collection schema
schema = CollectionSchema(
    fields=[
        id_field,
        source_text_field,
        source_vector_field,
        source_language_field,
        translation_text_field,
        translation_language_field,
        translation_vector_field
    ],
    description="Collection for storing translations with source text, translation text, languages, and translation embeddings"
)

# Create collection if it doesn't exist, otherwise use existing one
if utility.has_collection(MILVUS_COLLECTION_NAME):
    print(f"Collection '{MILVUS_COLLECTION_NAME}' already exists. Using existing collection.")
    collection = Collection(MILVUS_COLLECTION_NAME)
    
    # Check and create indexes if they don't exist
    # Note: Collection must be released before creating indexes
    index_params = {
        "metric_type": "L2",
        "index_type": "IVF_FLAT",
        "params": {"nlist": 1024}
    }
    
    # Release collection if loaded (needed before creating indexes)
    try:
        collection.release()
    except:
        pass  # Collection might not be loaded
    
    # Check if indexes exist, create if missing
    # Use try-except to check if index exists (has_index has parameter issues)
    def index_exists(collection, field_name):
        """Check if an index exists for a given field name."""
        try:
            # Try to get indexes - if field has index, it will be in the list
            indexes = collection.indexes
            return any(idx.field_name == field_name for idx in indexes)
        except Exception:
            # If we can't check, assume it doesn't exist
            return False
    
    if not index_exists(collection, "source_vector"):
        print("Creating index for source_vector field...")
        collection.create_index(field_name="source_vector", index_params=index_params)
    
    if not index_exists(collection, "translation_vector"):
        print("Creating index for translation_vector field...")
        collection.create_index(field_name="translation_vector", index_params=index_params)
    
    # Load collection after ensuring indexes exist
    collection.load()
else:
    print(f"Creating new collection '{MILVUS_COLLECTION_NAME}' with custom schema.")
    collection = Collection(name=MILVUS_COLLECTION_NAME, schema=schema)
    # Create indexes on both vector fields for efficient search
    index_params = {
        "metric_type": "L2",
        "index_type": "IVF_FLAT",
        "params": {"nlist": 1024}
    }
    print("Creating indexes for vector fields...")
    collection.create_index(field_name="source_vector", index_params=index_params)
    collection.create_index(field_name="translation_vector", index_params=index_params)
    collection.load()

# Now use langchain_milvus with the existing collection
# Map translation_text to the text field and translation_vector to the vector field
vectorstore = Milvus(
    embedding_function=embedding_model,
    connection_args={
        "uri": MILVUS_URI,
        "token": MILVUS_TOKEN
    },
    collection_name=MILVUS_COLLECTION_NAME,
    vector_field="source_vector"
)

from langchain_core.stores import InMemoryByteStore

# This saves the parent documents to a folder on your disk
store = InMemoryByteStore()

Collection 'translations' already exists. Using existing collection.


In [35]:
# Helper function to insert translation data with all custom fields
# Note: langchain_milvus only handles text_field and vector_field automatically
# For full control over all fields, use pymilvus directly:

def insert_translation_data(
    source_text: str,
    source_language: str,
    translation_text: str,
    translation_language: str,
    embedding_model: GoogleGenerativeAIEmbeddings
):
    """
    Insert translation data with all custom schema fields.
    
    Args:
        source_text: Original source text
        source_language: Language code of source text (e.g., 'tib', 'en')
        translation_text: Translated text
        translation_language: Language code of translation (e.g., 'en', 'tib')
        embedding_model: Embedding model to generate vectors
    """
    # Generate embeddings for both source and translation text
    source_vector = embedding_model.embed_query(source_text)
    translation_vector = embedding_model.embed_query(translation_text)
    
    # Prepare data for insertion (pymilvus accepts list of dicts)
    # Note: id field is auto-generated, so we don't include it
    data = [{
        "source_text": source_text,
        "source_vector": source_vector,
        "source_language": source_language,
        "translation_text": translation_text,
        "translation_language": translation_language,
        "translation_vector": translation_vector
    }]
    
    # Insert into collection (id is auto-generated)
    collection.insert(data)
    collection.flush()  # Ensure data is written
    print(f"Inserted translation: {source_language} -> {translation_language}")

# Example usage:
insert_translation_data(
    source_text="བཀྲ་ཤིས་བདེ་ལེགས།",
    source_language="tib",
    translation_text="Hello, may you be well",
    translation_language="en",
    embedding_model=embedding_model
)


Inserted translation: tib -> en


In [36]:

# The splitter to create the large chunks (Parents)
parent_splitter = RecursiveCharacterTextSplitter(chunk_size=4000)

# The splitter to create the small chunks (Children)
child_splitter = RecursiveCharacterTextSplitter(chunk_size=400)

In [38]:
from langchain_classic.retrievers import ParentDocumentRetriever
from langchain_core.documents import Document

# Following the field schema defined earlier in the notebook
docs = [
    Document(
        page_content="བཀྲ་ཤིས་བདེ་ལེགས།",
        metadata={
            "source_text": "བཀྲ་ཤིས་བདེ་ལེགས།",
            "source_language": "tib",
            "translation_text": "Hello, may you be well",
            "translation_language": "en",
        }
    ),
    Document(
        page_content="Hello, may you be well",
        metadata={
            "source_text": "Hello, may you be well",
            "source_language": "en",
            "translation_text": "བཀྲ་ཤིས་བདེ་ལེགས།",
            "translation_language": "tib",
        }
    )
]

retriever = ParentDocumentRetriever(
    vectorstore=vectorstore,
    docstore=store,
    child_splitter=child_splitter,
    parent_splitter=parent_splitter,
)

# Add documents to the retriever
retriever.add_documents(docs)

# Use it in your RAG pipeline
query = "What are the specific technical requirements for the project?"

2026-01-05 17:05:07,666 [ERROR][handler]: RPC error: [insert_rows], <DataNotMatchException: (code=1, message=Attempt to insert an unexpected field `text` to collection without enabling dynamic field)>, <Time:{'RPC start': '2026-01-05 17:05:07.473569', 'RPC error': '2026-01-05 17:05:07.665271'}>
Traceback:
Traceback (most recent call last):
  File "d:\work\langraph-api\.venv\Lib\site-packages\pymilvus\decorators.py", line 263, in handler
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "d:\work\langraph-api\.venv\Lib\site-packages\pymilvus\decorators.py", line 322, in handler
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "d:\work\langraph-api\.venv\Lib\site-packages\pymilvus\decorators.py", line 196, in handler
    raise e from e
  File "d:\work\langraph-api\.venv\Lib\site-packages\pymilvus\decorators.py", line 166, in handler
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "d:\work\langraph-api\.ven

DataNotMatchException: <DataNotMatchException: (code=1, message=Attempt to insert an unexpected field `text` to collection without enabling dynamic field)>