## 安裝 LightRAG

In [4]:
!pip install lightrag-hku

Collecting lightrag-hku
  Downloading lightrag_hku-1.4.4-py3-none-any.whl.metadata (77 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m77.8/77.8 kB[0m [31m1.2 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting aiohttp (from lightrag-hku)
  Downloading aiohttp-3.12.14-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.6 kB)
Collecting configparser (from lightrag-hku)
  Downloading configparser-7.2.0-py3-none-any.whl.metadata (5.5 kB)
Collecting dotenv (from lightrag-hku)
  Downloading dotenv-0.9.9-py2.py3-none-any.whl.metadata (279 bytes)
Collecting future (from lightrag-hku)
  Downloading future-1.0.0-py3-none-any.whl.metadata (4.0 kB)
Collecting nano-vectordb (from lightrag-hku)
  Downloading nano_vectordb-0.0.4.3-py3-none-any.whl.metadata (3.7 kB)
Collecting pandas>=2.0.0 (from lightrag-hku)
  Downloading pandas-2.3.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (91 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━

## 官方文件
https://github.com/HKUDS/LightRAG

* **LLM 選擇 (LLM Selection)**:

建議使用參數至少為 320 億 (32B) 的大型語言模型。
上下文長度 (context length) 應至少為 32KB，推薦使用 64KB。

* **嵌入模型 (Embedding Model)**:

高效能的嵌入模型對於 RAG 至關重要。
我們推薦使用主流的多語言嵌入模型，例如：BAAI/bge-m3 和 text-embedding-3-large。
重要提示： 嵌入模型必須在文件索引建立之前確定，並且在文件查詢階段必須使用相同的模型。

* **重排序模型配置 (Reranker Model Configuration)**:

配置重排序模型 (Reranker) 可以顯著提升 LightRAG 的檢索效能。
當啟用重排序模型時，建議將「混合模式」(mix mode) 設定為預設的查詢模式。
我們推薦使用主流的重排序模型，例如：BAAI/bge-reranker-v2-m3 或像 Jina 服務提供的模型。 

Ollama LLM + Ollama Embedding:
建立 .env 檔案
```e
LLM_BINDING=ollama # 使用 Ollama 的 API 協定來與 LLM 通訊
LLM_MODEL=mistral-nemo:latest # 指定要使用的 LLM 模型名稱
LLM_BINDING_HOST=http://localhost:11434 # 指定了 Ollama 服務的位址和 port 號
# LLM_BINDING_API_KEY=your_api_key # 可選參數，用於在 Ollama 需要 API 金鑰認證時
###  Ollama Server context length
OLLAMA_NUM_CTX=8192 # 設定 LLM 的上下文長度，較長的上下文可以在一次處理中接收和處理更多文本

EMBEDDING_BINDING=ollama # 指定 LightRAG 伺服器要綁定的 Embedding 後端類型為 Ollama
EMBEDDING_BINDING_HOST=http://localhost:11434
EMBEDDING_MODEL=bge-m3:latest # 指定要使用的 Embedding 模型名稱
EMBEDDING_DIM=1024
# EMBEDDING_BINDING_API_KEY=your_api_key
```

In [None]:
LLM_BINDING=ollama
LLM_MODEL=gemma3:27b
LLM_BINDING_HOST=http://dandelion-ollama-1:11434
# LLM_BINDING_API_KEY=your_api_key
###  Ollama Server context length
OLLAMA_NUM_CTX=12283 # min=8192 , max=16384

EMBEDDING_BINDING=ollama
EMBEDDING_BINDING_HOST=http://dandelion-ollama-1:11434
EMBEDDING_MODEL=bge-m3:latest
EMBEDDING_DIM=1024

rerank 整合指南：  
https://github.com/HKUDS/LightRAG/blob/main/docs/rerank_integration.md  

設置 enable_rerank ＝ True，針對每個查詢控制重新排序

In [None]:
from lightrag import LightRAG, QueryParam
from lightrag.rerank import custom_rerank, RerankModel

# Method 1: Using a custom rerank function with all settings included
async def my_rerank_func(query: str, documents: list, top_n: int = None, **kwargs):
    return await custom_rerank(
        query=query,
        documents=documents,
        model="BAAI/bge-reranker-v2-m3",
        base_url="https://api.your-provider.com/v1/rerank",
        api_key="your_api_key_here",
        top_n=top_n or 10,  # Handle top_n within the function
        **kwargs
    )

rag = LightRAG(
    working_dir="./rag_storage",
    llm_model_func=your_llm_func,
    embedding_func=your_embedding_func,
    rerank_model_func=my_rerank_func,  # Configure rerank function
)

# Query with rerank enabled (default)
result = await rag.aquery(
    "your query",
    param=QueryParam(enable_rerank=True)  # Control rerank per query
)

# Query with rerank disabled
result = await rag.aquery(
    "your query",
    param=QueryParam(enable_rerank=False)
)

# Method 2: Using RerankModel wrapper
rerank_model = RerankModel(
    rerank_func=custom_rerank,
    kwargs={
        "model": "BAAI/bge-reranker-v2-m3",
        "base_url": "https://api.your-provider.com/v1/rerank",
        "api_key": "your_api_key_here",
    }
)

rag = LightRAG(
    working_dir="./rag_storage",
    llm_model_func=your_llm_func,
    embedding_func=your_embedding_func,
    rerank_model_func=rerank_model.rerank,
)

# Control rerank per query
result = await rag.aquery(
    "your query",
    param=QueryParam(
        enable_rerank=True,  # Enable rerank for this query
        chunk_top_k=5       # Number of chunks to keep after reranking
    )
)

``` python
from lightrag.rerank import custom_rerank

# Your custom API endpoint
result = await custom_rerank(
    query="your query",
    documents=documents,
    model="BAAI/bge-reranker-v2-m3",
    base_url="https://api.your-provider.com/v1/rerank",
    api_key="your_api_key_here",
    top_n=10
)
```

Jina

``` python
from lightrag.rerank import jina_rerank

result = await jina_rerank(
    query="your query",
    documents=documents,
    model="BAAI/bge-reranker-v2-m3",
    api_key="your_jina_api_key",
    top_n=10
)
```

Cohere


``` python
from lightrag.rerank import cohere_rerank

result = await cohere_rerank(
    query="your query",
    documents=documents,
    model="rerank-english-v2.0",
    api_key="your_cohere_api_key",
    top_n=10
)
```

官方程式：  
https://github.com/HKUDS/LightRAG/blob/main/examples/rerank_example.py
https://github.com/HKUDS/LightRAG/blob/main/lightrag/rerank.py


In [None]:
"""
LightRAG Rerank Integration Example

This example demonstrates how to use rerank functionality with LightRAG
to improve retrieval quality across different query modes.

Configuration Required:
1. Set your LLM API key and base URL in llm_model_func()
2. Set your embedding API key and base URL in embedding_func()
3. Set your rerank API key and base URL in the rerank configuration
4. Or use environment variables (.env file):
   - RERANK_MODEL=your_rerank_model
   - RERANK_BINDING_HOST=your_rerank_endpoint
   - RERANK_BINDING_API_KEY=your_rerank_api_key

Note: Rerank is now controlled per query via the 'enable_rerank' parameter (default: True)
"""

import asyncio
import os
import numpy as np

from lightrag import LightRAG, QueryParam
from lightrag.rerank import custom_rerank, RerankModel
# from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.llm.ollama import ollama_model_complete, ollama_embed
from lightrag.utils import EmbeddingFunc, setup_logger
from lightrag.kg.shared_storage import initialize_pipeline_status

import requests
import json

# Set up your working directory
WORKING_DIR = "/workspace/yuchen/light_test_rerank"
setup_logger("test_rerank")

if not os.path.exists(WORKING_DIR):
    os.mkdir(WORKING_DIR)

# 設定 Ollama 服務的 URL
OLLAMA_BASE_URL = "http://dandelion-ollama-1:11434"

async def llm_model_func(
    prompt, system_prompt=None, history_messages=[], **kwargs
) -> str:
    return await ollama_model_complete(
        "qwen2.5:0.5b",
        prompt,
        system_prompt=system_prompt,
        history_messages=history_messages,
        # api_key="your_llm_api_key_here",
        base_url=OLLAMA_BASE_URL,
        **kwargs,
    )


async def embedding_func(texts: list[str]) -> np.ndarray:
    return await ollama_embed(
        texts,
        embed_model="nomic-embed-text:latest",
        # api_key="your_embedding_api_key_here",
        base_url=OLLAMA_BASE_URL,
    )

# -------------------------------------------------------------------------
async def ollama_rerank(
    query: str, 
    documents: list, 
    model: str = "xitao/bge-reranker-v2-m3:latest", 
    base_url: str = OLLAMA_BASE_URL,
    top_n: int = None, 
    **kwargs
) -> list:
    """
    ¨使用 Ollama 服務進行 rerank 的自訂函式
    """
    url = f"{base_url}/api/rerank"
    
    # Ollama rerank API »Ý­nªº payload ®æ¦¡
    payload = {
        "model": model,
        "query": query,
        "documents": [doc["content"] for doc in documents], # Ollama rerank documents °Ñ¼Æ®æ¦¡
        "top_n": top_n or 10,
    }
    
    try:
        response = requests.post(url, json=payload)
        response.raise_for_status()
        
        reranked_results = response.json().get("results", [])
        
        # ±Nµ²ªGÂà´«¦^ LightRAG ªº¤å¥ó®æ¦¡
        reranked_docs = []
        for result in reranked_results:
            original_index = result["index"]
            score = result["score"]
            doc = documents[original_index].copy()
            doc["rerank_score"] = score
            reranked_docs.append(doc)
            
        return reranked_docs

    except requests.exceptions.RequestException as e:
        print(f"Error calling Ollama rerank API: {e}")
        return []
# -------------------------------------------------------------------------


async def my_rerank_func(query: str, documents: list, top_n: int = None, **kwargs):
    """Custom rerank function with all settings included"""
    return await ollama_rerank(
        query=query,
        documents=documents,
        top_n=top_n or 10,
        **kwargs,
    )
    # return await custom_rerank(
    #     query=query,
    #     documents=documents,
    #     model="BAAI/bge-reranker-v2-m3",
    #     base_url="https://api.your-rerank-provider.com/v1/rerank",
    #     api_key="your_rerank_api_key_here",
    #     top_n=top_n or 10,
    #     **kwargs,
    # )


async def create_rag_with_rerank():
    """Create LightRAG instance with rerank configuration"""

    # Get embedding dimension
    test_embedding = await embedding_func(["test"])
    embedding_dim = test_embedding.shape[1]
    print(f"Detected embedding dimension: {embedding_dim}")

    # Method 1: Using custom rerank function
    rag = LightRAG(
        working_dir=WORKING_DIR,
        llm_model_func=llm_model_func,
        embedding_func=EmbeddingFunc(
            embedding_dim=embedding_dim,
            max_token_size=8192,
            func=embedding_func,
        ),
        # Rerank Configuration - provide the rerank function
        rerank_model_func=my_rerank_func,
    )

    await rag.initialize_storages()
    await initialize_pipeline_status()

    return rag


async def create_rag_with_rerank_model():
    """Alternative: Create LightRAG instance using RerankModel wrapper"""

    # Get embedding dimension
    test_embedding = await embedding_func(["test"])
    embedding_dim = test_embedding.shape[1]
    print(f"Detected embedding dimension: {embedding_dim}")

    # Method 2: Using RerankModel wrapper
    rerank_model = RerankModel(
        rerank_func=custom_rerank,
        kwargs={
            "model": "xitao/bge-reranker-v2-m3:latest",
            "base_url": OLLAMA_BASE_URL,
        },
        # kwargs={
        #     "model": "BAAI/bge-reranker-v2-m3",
        #     "base_url": "https://api.your-rerank-provider.com/v1/rerank",
        #     "api_key": "your_rerank_api_key_here",
        # },
    )

    rag = LightRAG(
        working_dir=WORKING_DIR,
        llm_model_func=llm_model_func,
        embedding_func=EmbeddingFunc(
            embedding_dim=embedding_dim,
            max_token_size=8192,
            func=embedding_func,
        ),
        rerank_model_func=rerank_model.rerank,
    )

    await rag.initialize_storages()
    await initialize_pipeline_status()

    return rag


async def test_rerank_with_different_settings():
    """
    Test rerank functionality with different enable_rerank settings
    """
    print("?? Setting up LightRAG with Rerank functionality...")

    rag = await create_rag_with_rerank()

    # Insert sample documents
    sample_docs = [
        "Reranking improves retrieval quality by re-ordering documents based on relevance.",
        "LightRAG is a powerful retrieval-augmented generation system with multiple query modes.",
        "Vector databases enable efficient similarity search in high-dimensional embedding spaces.",
        "Natural language processing has evolved with large language models and transformers.",
        "Machine learning algorithms can learn patterns from data without explicit programming.",
    ]

    print("?? Inserting sample documents...")
    await rag.ainsert(sample_docs)

    query = "How does reranking improve retrieval quality?"
    print(f"\n?? Testing query: '{query}'")
    print("=" * 80)

    # Test with rerank enabled (default)
    print("\n?? Testing with enable_rerank=True (default):")
    result_with_rerank = await rag.aquery(
        query,
        param=QueryParam(
            mode="naive",
            top_k=10,
            chunk_top_k=5,
            enable_rerank=True,  # Explicitly enable rerank
        ),
    )
    print(f"   Result length: {len(result_with_rerank)} characters")
    print(f"   Preview: {result_with_rerank[:100]}...")

    # Test with rerank disabled
    print("\n?? Testing with enable_rerank=False:")
    result_without_rerank = await rag.aquery(
        query,
        param=QueryParam(
            mode="naive",
            top_k=10,
            chunk_top_k=5,
            enable_rerank=False,  # Disable rerank
        ),
    )
    print(f"   Result length: {len(result_without_rerank)} characters")
    print(f"   Preview: {result_without_rerank[:100]}...")

    # Test with default settings (enable_rerank defaults to True)
    print("\n?? Testing with default settings (enable_rerank defaults to True):")
    result_default = await rag.aquery(
        query, param=QueryParam(mode="naive", top_k=10, chunk_top_k=5)
    )
    print(f"   Result length: {len(result_default)} characters")
    print(f"   Preview: {result_default[:100]}...")


async def test_direct_rerank():
    """Test rerank function directly"""
    print("\n?? Direct Rerank API Test")
    print("=" * 40)

    documents = [
        {"content": "Reranking significantly improves retrieval quality"},
        {"content": "LightRAG supports advanced reranking capabilities"},
        {"content": "Vector search finds semantically similar documents"},
        {"content": "Natural language processing with modern transformers"},
        {"content": "The quick brown fox jumps over the lazy dog"},
    ]

    query = "rerank improve quality"
    print(f"Query: '{query}'")
    print(f"Documents: {len(documents)}")

    try:
        reranked_docs = await custom_rerank(
            query=query,
            documents=documents,
            model="BAAI/bge-reranker-v2-m3",
            base_url="https://api.your-rerank-provider.com/v1/rerank",
            api_key="your_rerank_api_key_here",
            top_n=3,
        )

        print("\n? Rerank Results:")
        for i, doc in enumerate(reranked_docs):
            score = doc.get("rerank_score", "N/A")
            content = doc.get("content", "")[:60]
            print(f"  {i+1}. Score: {score:.4f} | {content}...")

    except Exception as e:
        print(f"? Rerank failed: {e}")


async def main():
    """Main example function"""
    print("?? LightRAG Rerank Integration Example")
    print("=" * 60)

    try:
        # Test rerank with different enable_rerank settings
        await test_rerank_with_different_settings()

        # Test direct rerank
        await test_direct_rerank()

        print("\n? Example completed successfully!")
        print("\n?? Key Points:")
        print("   ? Rerank is now controlled per query via 'enable_rerank' parameter")
        print("   ? Default value for enable_rerank is True")
        print("   ? Rerank function is configured at LightRAG initialization")
        print("   ? Per-query enable_rerank setting overrides default behavior")
        print(
            "   ? If enable_rerank=True but no rerank model is configured, a warning is issued"
        )
        print("   ? Monitor API usage and costs when using rerank services")

    except Exception as e:
        print(f"\n? Example failed: {e}")
        import traceback

        traceback.print_exc()


if __name__ == "__main__":
    asyncio.run(main())

'''
import asyncio
import os
import inspect
import logging
from lightrag import LightRAG, QueryParam
from lightrag.llm.ollama import ollama_model_complete, ollama_embed
from lightrag.utils import EmbeddingFunc

WORKING_DIR = "/workspace/yuchen/light_test"

logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)

if not os.path.exists(WORKING_DIR):
    os.mkdir(WORKING_DIR)

rag = LightRAG(
    working_dir=WORKING_DIR,
    llm_model_func=ollama_model_complete,
    llm_model_name="qwen2.5:0.5b",
    llm_model_max_async=4,
    llm_model_max_token_size=32768,
    llm_model_kwargs={"host": "http://dandelion-ollama-1:11434", "options": {"num_ctx": 32768}},

    embedding_func=EmbeddingFunc(
        embedding_dim=768,
        max_token_size=8192,
        func=lambda texts: ollama_embed(
            texts, embed_model="nomic-embed-text:latest", host="http://dandelion-ollama-1:11434"
        ),
    ),
)

with open("../data/PDF_file.txt", "r", encoding="utf-8") as f:
    rag.insert(f.read())
# async def main():
#     with open("./data/PDF_file.txt", "r", encoding="utf-8") as f:
#         await rag.ainsert(f.read())
        
# await main()

# Perform naive search
print(
    rag.query("What are the top themes in this story?", param=QueryParam(mode="naive"))
)

# Perform local search
print(
    rag.query("What are the top themes in this story?", param=QueryParam(mode="local"))
)

# Perform global search
print(
    rag.query("What are the top themes in this story?", param=QueryParam(mode="global"))
)

# Perform hybrid search
print(
    rag.query("What are the top themes in this story?", param=QueryParam(mode="hybrid"))
)

# stream response
resp = rag.query(
    "What are the top themes in this story?",
    param=QueryParam(mode="hybrid", stream=True),
)

async def print_stream(stream):
    async for chunk in stream:
        print(chunk, end="", flush=True)

if inspect.isasyncgen(resp):
    asyncio.run(print_stream(resp))

else:
    print(resp)

    '''
'''
    INFO:Init {'embedding_dim': 768, 'metric': 'cosine', 'storage_file': '/workspace/yuchen/light_test/vdb_entities.json'} 0 data
    INFO:Init {'embedding_dim': 768, 'metric': 'cosine', 'storage_file': '/workspace/yuchen/light_test/vdb_relationships.json'} 0 data
    INFO:Init {'embedding_dim': 768, 'metric': 'cosine', 'storage_file': '/workspace/yuchen/light_test/vdb_chunks.json'} 0 data
    Rerank is enabled but no rerank_model_func provided. Reranking will be skipped.
    '''

## 教學
https://yourmi.csdn.net/67c529ffd649b06b61c6f690.html

In [2]:
import requests
import json
import os

# LightRAG Server 在 Docker 網路中的服務名稱和埠號
# 'lightrag' 是 docker-compose.yml 中定義的服務名稱
LIGHTRAG_SERVER_URL = "http://lightrag:9621"

print(f"嘗試連接 LightRAG Server: {LIGHTRAG_SERVER_URL}")

# --- 步驟 1: 插入文件到 LightRAG Server ---
documents = [
    "人工智能(AI)是计算机科学的一个分支,致力于开发能模拟人类智能的系统。",
    "机器学习是AI的核心技术之一,它使计算机能够从数据中学习和改进。",
    "深度学习是机器学习的一个子领域,使用多层神经网络处理复杂问题。"
]

insert_url = f"{LIGHTRAG_SERVER_URL}/documents/text"
headers = {"Content-Type": "application/json"}

print("\n--- 插入文件 ---")
for i, doc_content in enumerate(documents):
    payload = {
        "text": doc_content,
        "description": f"Document {i+1} about AI concepts"
    }
    try:
        response = requests.post(insert_url, headers=headers, data=json.dumps(payload))
        response.raise_for_status() # 如果請求不成功，會拋出 HTTPError
        print(f"文件 {i+1} 插入成功: {response.json()}")
    except requests.exceptions.RequestException as e:
        print(f"文件 {i+1} 插入失敗: {e}")
        print(f"回應內容: {response.text if 'response' in locals() else 'N/A'}")
        # 如果插入失敗，可能需要檢查 LightRAG Server 的日誌


# --- 步驟 2: 觸發文件掃描 (如果文件是透過 volumes 映射到 input_dir) ---
# 如果您是透過 /documents/text 插入，則不需要此步驟，因為內容已直接處理。
# 但如果您的文件是放在 LightRAG 容器的 input_dir 中，則需要掃描。
# 這裡假設您是透過 /documents/text 插入，所以此步驟是可選的。
# scan_url = f"{LIGHTRAG_SERVER_URL}/documents/scan"
# print("\n--- 觸發文件掃描 (如果需要) ---")
# try:
#     response = requests.post(scan_url, timeout=1800) # 設置較長的超時時間
#     response.raise_for_status()
#     print(f"文件掃描觸發成功: {response.json()}")
# except requests.exceptions.RequestException as e:
#     print(f"文件掃描觸發失敗: {e}")
#     print(f"回應內容: {response.text if 'response' in locals() else 'N/A'}")

# --- 步驟 3: 進行查詢測試 ---
query_text = "请解释AI、机器学习和深度学习之间的关系"
query_url = f"{LIGHTRAG_SERVER_URL}/query"

query_payload = {
    "query": query_text,
    "mode": "hybrid" # LightRAG 支援的查詢模式，例如 "hybrid", "local", "global", "mix"
}

print(f"\n--- 執行查詢: {query_text} ---")
try:
    response = requests.post(query_url, headers=headers, data=json.dumps(query_payload))
    response.raise_for_status()
    result = response.json()
    print("查詢結果:")
    print(json.dumps(result, indent=2, ensure_ascii=False)) # 格式化輸出並支援中文
except requests.exceptions.RequestException as e:
    print(f"查詢失敗: {e}")
    print(f"回應內容: {response.text if 'response' in locals() else 'N/A'}")

嘗試連接 LightRAG Server: http://lightrag:9621

--- 插入文件 ---
文件 1 插入成功: {'status': 'success', 'message': 'Text successfully received. Processing will continue in background.'}
文件 2 插入成功: {'status': 'success', 'message': 'Text successfully received. Processing will continue in background.'}
文件 3 插入成功: {'status': 'success', 'message': 'Text successfully received. Processing will continue in background.'}

--- 執行查詢: 请解释AI、机器学习和深度学习之间的关系 ---
查詢結果:
{
  "response": "Sorry, I'm not able to provide an answer to that question.[no-context]"
}


In [7]:
import os
import logging
from lightrag import LightRAG, QueryParam
from lightrag.llm import ollama_model_complete, ollama_embedding
from lightrag.utils import EmbeddingFunc

# 设置日志级别
logging.basicConfig(format="%(levelname)s:%(message)s", level=logging.INFO)

# 创建工作目录
WORKING_DIR = "./my_rag_project"
os.makedirs(WORKING_DIR, exist_ok=True)

# 初始化LightRAG,使用Ollama模型
rag = LightRAG(
    working_dir=WORKING_DIR,
    llm_model_func=ollama_model_complete,
    llm_model_name="gemma2:2b",  # 使用Gemma 2B模型
    llm_model_max_async=4,  # 最大并发请求数
    llm_model_max_token_size=32768,
    llm_model_kwargs={
        "host": "http://localhost:11434",  # Ollama服务地址
        "options": {"num_ctx": 32768}  # 上下文窗口大小
    },
    embedding_func=EmbeddingFunc(
        embedding_dim=768,
        max_token_size=8192,
        func=lambda texts: ollama_embedding(
            texts, 
            embed_model="nomic-embed-text",  # 使用nomic-embed-text作为嵌入模型
            host="http://localhost:11434"
        ),
    ),
)

# 插入文档并进行查询
documents = [
    "人工智能(AI)是计算机科学的一个分支,致力于开发能模拟人类智能的系统。",
    "机器学习是AI的核心技术之一,它使计算机能够从数中学习和改进。",
    "深度学习是机器学习的一个子领域,使用多层神经网络处理复杂问题。"
]

# 插入文档
rag.insert(documents)

# 使用不同的检索模式进行查询
modes = ["naive", "local", "global", "hybrid"]
query = "请解释AI、机器学习和深度学习之间的关系"

for mode in modes:
    print(f"\n使用{mode}模式的查询结果:")
    result = rag.query(query, param=QueryParam(mode=mode))
    print(result)

ImportError: cannot import name 'ollama_chat_completion' from 'lightrag.llm' (/opt/conda/lib/python3.10/site-packages/lightrag/llm/__init__.py)

In [1]:
import os
from lightrag import LightRAG, QueryParam
from lightrag.llm import ollama

# 建立 ChatOllama 實例
# from langchain_ollama import ChatOllama

Chatllm = ollama(
    base_url='http://dandelion-ollama-1:11434', 
    model="llama3.1:8b",
    temperature=0.0,
    num_predict=51200
)

# 创建工作目录
WORKING_DIR = "./my_rag_project"
os.makedirs(WORKING_DIR, exist_ok=True)

# 初始化LightRAG
rag = LightRAG(
    working_dir=WORKING_DIR,
    llm_model_func=Chatllm
)

# 准备示例文档
documents = [
    "人工智能(AI)是计算机科学的一个分支,致力于开发能模拟人类智能的系统。",
    "机器学习是AI的核心技术之一,它使计算机能够从数据中学习和改进。",
    "深度学习是机器学习的一个子领域,使用多层神经网络处理复杂问题。"
]

# 插入文档
rag.insert(documents)

# 进行查询测试
query = "请解释AI、机器学习和深度学习之间的关系"
result = rag.query(query, param=QueryParam(mode="hybrid"))
print(result)

ModuleNotFoundError: No module named 'lightrag'

自訂 OllamaReranker 為 LangChain 的 DocumentCompressor

建立一個類別以包裝 Ollama api


In [33]:
from langchain_core.documents.compressor import BaseDocumentCompressor
# from langchain_core.documents.compressor import BaseDocumentCompressor
from langchain_core.documents import Document
from typing import List, Any
import requests
import re
from pydantic import BaseModel, Field

class OllamaReranker(BaseDocumentCompressor, BaseModel):
    model: str = Field(default="xitao/bge-reranker-v2-m3")
    base_url: str = Field(default="http://localhost:11434")

    def compress_documents(
        self,
        documents: List[Document],
        query: str,
        **kwargs: Any  # ©ú½T§i¶D Pydantic ¥i±µ¨ü¥ô¦óÃB¥~°Ñ¼Æ
    ) -> List[Document]:
        print(f"[debug] compress_documents called with kwargs: {kwargs}")

        if not documents:
            return []

        prompt = f"Query: {query}\n\n"
        for i, doc in enumerate(documents):
            prompt += f"[{i}] {doc.page_content.strip()}\n"

        payload = {
            "model": self.model,
            "prompt": prompt,
            "stream": False
        }

        try:
File /opt/conda/lib/python3.10/site-packages/langchain_core/retrievers.py:259, in BaseRetriever.invoke(self, input, config, **kwargs)
    257 kwargs_ = kwargs if self._expects_other_args else {}
    258 if self._new_arg_supported:
--> 259     result = self._get_relevant_documents(
    260         input, run_manager=run_manager, **kwargs_
    261     )
    262 else:
    263     result = self._get_relevant_documents(input, **kwargs_)

File /opt/conda/lib/python3.10/site-packages/langchain/retrievers/contextual_compression.py:44, in ContextualCompressionRetriever._get_relevant_documents(self, query, run_manager, **kwargs)
     40 docs = self.base_retriever.invoke(
     41     query, config={"callbacks": run_manager.get_child()}, **kwargs
     42 )
     43 if docs:
---> 44     compressed_docs = self.base_compressor.compress_documents(
     45         docs, query, callbacks=run_manager.get_child()
     46     )
     47     return list(compressed_docs)
     48 else:

TypeError: OllamaReranker.compress_documents() got an unexpected keyword argument 'callbacks'


DocumentCompressorMixin
1
11
Python 3 (ipykernel) | Idle
1
Untitled.ipynb
Ln 3, Col 16
Mode: Command

            response = requests.post(f"{self.base_url}/api/generate", json=payload)
            result = response.json()
            ranked_indices = self._parse_output_indices(result["response"])
        except Exception as e:
            print("Ollama rerank failed:", e)
            return documents

        return [documents[i] for i in ranked_indices if i < len(documents)]

    def _parse_output_indices(self, text: str) -> List[int]:
        return list(map(int, re.findall(r"\d+", text)))


In [None]:
from FlagEmbedding import FlagLLMReranker
reranker = FlagLLMReranker('BAAI/bge-reranker-v2-gemma', use_fp16=True) # Setting use_fp16 to True speeds up computation with a slight performance degradation
# reranker = FlagLLMReranker('BAAI/bge-reranker-v2-gemma', use_bf16=True) # You can also set use_bf16=True to speed up computation with a slight performance degradation

score = reranker.compute_score(['query', 'passage'])
print(score)

scores = reranker.compute_score([['what is panda?', 'hi'], ['what is panda?', 'The giant panda (Ailuropoda melanoleuca), sometimes called a panda bear or simply panda, is a bear species endemic to China.']])
print(scores)


Loading checkpoint shards:   0%|          | 0/3 [00:00<?, ?it/s]

In [8]:
from langchain_community.document_loaders import PyPDFLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter

# 載入 PDF 文件
loader = PyPDFLoader('./../data/PDF_file.pdf')
# docs = loader.load()

# 指定 text_splitter
text_splitter = RecursiveCharacterTextSplitter(chunk_size=512, chunk_overlap=0)
custom_chunks = loader.load_and_split(text_splitter=text_splitter)

In [None]:
from langchain_ollama import OllamaEmbeddings

embeddings = OllamaEmbeddings(
    base_url='http://dandelion-ollama-1:11434', 
    model="nomic-embed-text"
)

In [9]:
from langchain_chroma import Chroma

# 將資料庫永久儲存在磁碟
vectorstore_db = Chroma.from_documents(
    custom_chunks, 
    embedding=embeddings, 
    persist_directory = "./Chroma_db2",
    collection_name="little_prince_chroma"
)

retriever_Chroma_db = vectorstore_db.as_retriever(search_kwargs={"k": 5}) # 檢索最相關的 3 個文件


In [23]:
from langchain.retrievers import ContextualCompressionRetriever

ollama_reranker = OllamaReranker(
    model="xitao/bge-reranker-v2-m3",
    base_url="http://dandelion-ollama-1:11434"
)
compression_retriever = ContextualCompressionRetriever(
    base_compressor=ollama_reranker,
    base_retriever=retriever_Chroma_db
)

In [34]:

docs = compression_retriever.get_relevant_documents("誰是小王子？")
for i, doc in enumerate(docs):
    print(f"[{i}] {doc.page_content[:100]}...")

TypeError: OllamaReranker.compress_documents() got an unexpected keyword argument 'callbacks'

In [35]:
from langchain_core.documents import Document
from langchain_core.runnables import Runnable
from typing import List, Any
import requests
import re
from abc import ABC, abstractmethod

class ManualDocumentCompressor(Runnable, ABC):
    @abstractmethod
    def compress_documents(
        self, documents: List[Document], query: str, **kwargs
    ) -> List[Document]:
        ...

    def invoke(self, input: Any, **kwargs: Any) -> Any:
        return self.compress_documents(input["documents"], input["query"], **kwargs)
class OllamaReranker(ManualDocumentCompressor):
    def __init__(self, model="xitao/bge-reranker-v2-m3", base_url="http://localhost:11434"):
        self.model = model
        self.base_url = base_url

    def compress_documents(self, documents: List[Document], query: str, **kwargs) -> List[Document]:
        print(f"[debug] compress_documents called with kwargs: {kwargs}")

        if not documents:
            return []

        prompt = f"Query: {query}\n\n"
        for i, doc in enumerate(documents):
            prompt += f"[{i}] {doc.page_content.strip()}\n"

        payload = {
            "model": self.model,
            "prompt": prompt,
            "stream": False
        }

        try:
            response = requests.post(f"{self.base_url}/api/generate", json=payload)
            result = response.json()
            ranked_indices = self._parse_output_indices(result["response"])
        except Exception as e:
            print("Ollama rerank failed:", e)
            return documents

        return [documents[i] for i in ranked_indices if i < len(documents)]

    def _parse_output_indices(self, text: str) -> List[int]:
        return list(map(int, re.findall(r"\d+", text)))


In [36]:
compression_retriever = ContextualCompressionRetriever(
    base_compressor=ollama_reranker,  # ¡ö ³o¤´µM OK¡I
    base_retriever=retriever_Chroma_db
)

In [38]:
from langchain.retrievers import ContextualCompressionRetriever

ollama_reranker = OllamaReranker(
    model="xitao/bge-reranker-v2-m3",
    base_url="http://dandelion-ollama-1:11434"
)
compression_retriever = ContextualCompressionRetriever(
    base_compressor=ollama_reranker,
    base_retriever=retriever_Chroma_db
)

ValidationError: 1 validation error for ContextualCompressionRetriever
base_compressor
  Input should be a valid dictionary or instance of BaseDocumentCompressor [type=model_type, input_value=<__main__.OllamaReranker object at 0x7b9f096d9570>, input_type=OllamaReranker]
    For further information visit https://errors.pydantic.dev/2.11/v/model_type

In [37]:
docs = compression_retriever.get_relevant_documents("誰是小王子？")


TypeError: OllamaReranker.compress_documents() got an unexpected keyword argument 'callbacks'