# KB-Bridge: Intelligent Knowledge Base Search with MCP

**KB-Bridge** is a Model Context Protocol (MCP) server that provides intelligent search and retrieval capabilities for knowledge bases. It supports multiple backend providers (like Dify) and offers advanced features like semantic search, query rewriting, and automatic answer quality evaluation.

**Use Case**: KB-Bridge is ideal for contract analysis (e.g., [ContractNLI](https://stanfordnlp.github.io/contract-nli/)), document Q&A, and knowledge base search.

## What You'll Learn

- Real-world examples from ContractNLI dataset
- Query knowledge bases with natural language
- Custom instructions and query rewriting
- File discovery and document filtering

## Prerequisites

Before running this notebook:

1. **Install KB-Bridge**: `pip install kbbridge`
2. **Install MCP client**: `pip install mcp`
3. **Configure credentials**: Create a `.env` file with your backend credentials (see [README](https://github.com/egpivo/kb-bridge) for details)
4. **Start the server**: 
   - **Option A (Recommended)**: Use the server utilities in the next cell - just run `start_server()` or `ensure_server_running()`
   - **Option B**: Start manually: `python -m kbbridge.server --host 0.0.0.0 --port 5210`

## Quick Links

- [PyPI Package](https://pypi.org/project/kbbridge/)
- [GitHub Repository](https://github.com/egpivo/kb-bridge)
- [ContractNLI Dataset](https://stanfordnlp.github.io/contract-nli/)


In [None]:
# Import server management utilities
import sys
from pathlib import Path

# Add examples directory to path
cwd = Path.cwd()
if (cwd / 'examples' / 'utils.py').exists():
    sys.path.insert(0, str(cwd / 'examples'))
elif (cwd.parent / 'examples' / 'utils.py').exists():
    sys.path.insert(0, str(cwd.parent / 'examples'))

try:
    from utils import start_server, stop_server, show_logs, check_server_status, logs
    
    def ensure_server_running(port=5210):
        """Start server if not running"""
        if not check_server_status():
            start_server(port=port, kill_existing=True)
        else:
            print("Server already running")
    
    print("✓ Server utilities loaded")
    print("Use: start_server(port=5210) or ensure_server_running()")
except ImportError:
    print("⚠ Utils not available. Start server manually: python -m kbbridge.server --port 5210")


## Server Management

Start the server before running examples:

- `start_server(port=5210)` - Start server
- `ensure_server_running()` - Auto-start if needed
- `show_logs()` - View logs


In [None]:
# Start server if not running
ensure_server_running(port=5210)


## Real-World Examples: ContractNLI Dataset

Examples from the [ContractNLI dataset](https://stanfordnlp.github.io/contract-nli/) demonstrating legal document analysis.

**Note**: Set `RESOURCE_ID` to your knowledge base containing contract documents.


### Example 1: Non-Compete Clause

Query with custom instructions to extract specific legal terms.


In [None]:
async def example_non_compete_clause():
    """Query about non-compete clause restrictions"""
    query = "Does the agreement include a non-compete clause restricting the employee from joining competitors?"
    
    custom_instructions = """
    Extract: time periods (e.g., "12 months"), geographic scope (e.g., "50-mile radius"), 
    scope of restriction, and any exceptions. Cite exact text from the document.
    """
    
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("assistant", {
            "resource_id": RESOURCE_ID,
            "query": query,
            "custom_instructions": custom_instructions
        })
        
        response_data = json.loads(result.content[0].text)
        
        if "answer" in response_data:
            print("Answer:", response_data["answer"])
            if "sources" in response_data:
                print(f"\nSources ({len(response_data['sources'])}):")
                for source in response_data["sources"][:3]:
                    print(f"  - {source.get('title', 'Unknown')} (score: {source.get('score', 0):.3f})")
        else:
            print("Error:", response_data.get("error", "Unknown error"))

await example_non_compete_clause()


### Example 2: Termination Notice

Using query rewriting to improve search results.


In [None]:
async def example_termination_notice():
    """Query about termination notice requirements with query rewriting"""
    query = "What is the notice period required for contract termination by either party?"
    
    async with ClientSession(SERVER_URL) as session:
        # Query with rewriting enabled for better search results
        result = await session.call_tool("assistant", {
            "resource_id": RESOURCE_ID,
            "query": query,
            "enable_query_rewriting": True,
            "custom_instructions": "Extract exact notice period (duration, method, effective date). Cite contract language."
        })
        
        response_data = json.loads(result.content[0].text)
        
        if "answer" in response_data:
            print("Answer:", response_data["answer"])
            if "sources" in response_data:
                print(f"\nTop Sources:")
                for source in response_data["sources"][:3]:
                    print(f"  - {source.get('title', 'Unknown')} (score: {source.get('score', 0):.3f})")
        else:
            print("Error:", response_data.get("error", "Unknown error"))

await example_termination_notice()


In [None]:
import asyncio
import json
from mcp import ClientSession

# Server URL - adjust if your server is running on a different host/port
SERVER_URL = "http://localhost:5210/mcp"

# Example resource ID - replace with your actual resource ID
RESOURCE_ID = "your-resource-id"


## 1. Basic Assistant Query

The `assistant` tool is the primary tool for answering questions from your knowledge base.


In [None]:
async def basic_query_example():
    """Basic query example"""
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("assistant", {
            "resource_id": RESOURCE_ID,
            "query": "What are the safety protocols?"
        })
        
        # Parse the JSON response
        response_data = json.loads(result.content[0].text)
        print("Answer:", response_data.get("answer", "No answer found"))
        
        # Display sources if available
        if "sources" in response_data:
            print("\nSources:")
            for source in response_data["sources"][:5]:  # Show first 5 sources
                print(f"  - {source.get('title', 'Unknown')} (score: {source.get('score', 0):.3f})")

# Run the example
await basic_query_example()


## 2. Assistant with Custom Instructions

Use `custom_instructions` to provide domain-specific guidance for the answer extraction. Useful for legal documents, contracts (e.g., [ContractNLI](https://stanfordnlp.github.io/contract-nli/)), or domain-specific knowledge bases.
 

In [None]:
async def custom_instructions_example():
    """Example with custom instructions"""
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("assistant", {
            "resource_id": RESOURCE_ID,
            "query": "What is the maternity leave policy?",
            "custom_instructions": "Focus on HR compliance and legal requirements. Cite specific articles or sections."
        })
        
        response_data = json.loads(result.content[0].text)
        print("Answer:", response_data.get("answer", "No answer found"))

await custom_instructions_example()


## 3. Assistant with Query Rewriting

Enable `enable_query_rewriting` to allow LLM-based query expansion and relaxation for better results.


In [None]:
async def query_rewriting_example():
    """Example with query rewriting enabled"""
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("assistant", {
            "resource_id": RESOURCE_ID,
            "query": "safety rules",
            "enable_query_rewriting": True  # Enables LLM-based query expansion/relaxation
        })
        
        response_data = json.loads(result.content[0].text)
        print("Answer:", response_data.get("answer", "No answer found"))

await query_rewriting_example()


## 4. Assistant with Document Filtering

Use `document_name` to limit the search to a specific document within your knowledge base.


In [None]:
async def document_filtering_example():
    """Example with document filtering"""
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("assistant", {
            "resource_id": RESOURCE_ID,
            "query": "What are the safety protocols?",
            "document_name": "safety_manual.pdf"  # Limit search to specific document
        })
        
        response_data = json.loads(result.content[0].text)
        print("Answer:", response_data.get("answer", "No answer found"))

await document_filtering_example()


## 5. File Discovery Workflow

The `file_discover` tool helps you find relevant files before querying. This is useful when you want to:
1. Discover which files contain information about a topic
2. Then query specific files using `document_name` parameter


In [None]:
async def file_discovery_example():
    """Discover relevant files for a query"""
    async with ClientSession(SERVER_URL) as session:
        # Step 1: Discover relevant files
        result = await session.call_tool("file_discover", {
            "query": "employment policies",
            "resource_id": RESOURCE_ID,
            "top_k_recall": 100,  # Number of documents to retrieve
            "top_k_return": 20,   # Number of files to return
            "do_file_rerank": True,  # Enable reranking if available
            "relevance_score_threshold": 0.0
        })
        
        response_data = json.loads(result.content[0].text)
        
        if response_data.get("success"):
            files = response_data.get("distinct_files", [])
            print(f"Found {len(files)} relevant files:")
            for file in files[:10]:  # Show first 10 files
                print(f"  - {file}")
            
            # Step 2: Query a specific file
            if files:
                print(f"\nQuerying specific file: {files[0]}")
                answer_result = await session.call_tool("assistant", {
                    "resource_id": RESOURCE_ID,
                    "query": "What are the vacation policies?",
                    "document_name": files[0]  # Use file from discovery
                })
                
                answer_data = json.loads(answer_result.content[0].text)
                print("Answer:", answer_data.get("answer", "No answer found"))
        else:
            print("File discovery failed:", response_data.get("error", "Unknown error"))

await file_discovery_example()


## 6. List Files in Knowledge Base

The `file_lister` tool lists all files in a knowledge base resource with pagination support.


In [None]:
async def file_lister_example():
    """List files in a knowledge base"""
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("file_lister", {
            "resource_id": RESOURCE_ID,
            "timeout": 30,
            "limit": 50,  # Limit number of files returned
            "offset": 0   # Pagination offset
        })
        
        response_data = json.loads(result.content[0].text)
        
        if "files" in response_data:
            files = response_data["files"]
            print(f"Total files: {len(files)}")
            print("\nFiles:")
            for file in files[:20]:  # Show first 20 files
                file_name = file.get("name", "Unknown")
                file_size = file.get("size", 0)
                print(f"  - {file_name} ({file_size} bytes)")
        else:
            print("Error:", response_data.get("error", "Unknown error"))

await file_lister_example()


## 7. File Count

Get a quick count of files in your knowledge base resource.


In [None]:
async def file_count_example():
    """Get file count in knowledge base"""
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("file_count", {
            "resource_id": RESOURCE_ID
        })
        
        response_data = json.loads(result.content[0].text)
        print(f"File count: {response_data.get('file_count', 0)}")
        print(f"Has files: {response_data.get('has_files', False)}")

await file_count_example()


## 8. Keyword Generation

The `keyword_generator` tool generates search keywords using LLM to help improve search queries.


In [None]:
async def keyword_generator_example():
    """Generate keywords for a query"""
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("keyword_generator", {
            "query": "employee benefits and compensation",
            "max_sets": 5  # Number of keyword sets to generate
        })
        
        response_data = json.loads(result.content[0].text)
        
        if "keyword_sets" in response_data:
            print("Generated keyword sets:")
            for i, keyword_set in enumerate(response_data["keyword_sets"], 1):
                print(f"\nSet {i}:")
                for keyword in keyword_set:
                    print(f"  - {keyword}")
        else:
            print("Error:", response_data.get("error", "Unknown error"))

await keyword_generator_example()


## 9. Retriever Tool

The `retriever` tool provides low-level access to retrieval with various search methods and fine-grained control.


In [None]:
async def retriever_example():
    """Use retriever tool with different search methods"""
    async with ClientSession(SERVER_URL) as session:
        # Example: Hybrid search (semantic + keyword)
        result = await session.call_tool("retriever", {
            "resource_id": RESOURCE_ID,
            "query": "safety protocols",
            "search_method": "hybrid",  # Options: "hybrid", "semantic", "keyword", "full_text"
            "does_rerank": True,  # Enable reranking if available
            "top_k": 10,  # Number of results to return
            "score_threshold": 0.0,  # Minimum relevance score
            "weights": 0.5,  # Weight for hybrid search (0.0 = keyword, 1.0 = semantic)
            "document_name": "",  # Optional: filter by document
            "verbose": False
        })
        
        response_data = json.loads(result.content[0].text)
        
        if "documents" in response_data:
            documents = response_data["documents"]
            print(f"Retrieved {len(documents)} documents:")
            for i, doc in enumerate(documents[:5], 1):  # Show first 5
                print(f"\nDocument {i}:")
                print(f"  Title: {doc.get('title', 'Unknown')}")
                print(f"  Score: {doc.get('score', 0):.3f}")
                print(f"  Content preview: {doc.get('content', '')[:200]}...")
        else:
            print("Error:", response_data.get("error", "Unknown error"))

await retriever_example()


## 10. Complete Workflow Example

A complete workflow combining multiple tools for a comprehensive search and answer extraction.


In [None]:
async def complete_workflow_example():
    """Complete workflow: discover files -> generate keywords -> query assistant"""
    async with ClientSession(SERVER_URL) as session:
        query = "employee vacation and leave policies"
        
        print("=" * 60)
        print("Complete Workflow Example")
        print("=" * 60)
        
        # Step 1: Generate keywords
        print("\n1. Generating keywords...")
        keyword_result = await session.call_tool("keyword_generator", {
            "query": query,
            "max_sets": 3
        })
        keyword_data = json.loads(keyword_result.content[0].text)
        if "keyword_sets" in keyword_data:
            print(f"   Generated {len(keyword_data['keyword_sets'])} keyword sets")
        
        # Step 2: Discover relevant files
        print("\n2. Discovering relevant files...")
        file_result = await session.call_tool("file_discover", {
            "query": query,
            "resource_id": RESOURCE_ID,
            "top_k_return": 10
        })
        file_data = json.loads(file_result.content[0].text)
        files = file_data.get("distinct_files", [])
        print(f"   Found {len(files)} relevant files")
        
        # Step 3: Query assistant with custom instructions
        print("\n3. Querying assistant...")
        answer_result = await session.call_tool("assistant", {
            "resource_id": RESOURCE_ID,
            "query": query,
            "custom_instructions": "Provide a comprehensive answer with specific details. Cite sources.",
            "enable_query_rewriting": True
        })
        answer_data = json.loads(answer_result.content[0].text)
        
        print("\n" + "=" * 60)
        print("Answer:")
        print("=" * 60)
        print(answer_data.get("answer", "No answer found"))
        
        if "sources" in answer_data:
            print(f"\nSources ({len(answer_data['sources'])}):")
            for source in answer_data["sources"][:5]:
                print(f"  - {source.get('title', 'Unknown')}")

await complete_workflow_example()


## 11. Error Handling

Here's how to handle errors gracefully when working with the KB-Bridge API:


In [None]:
async def error_handling_example():
    """Example of error handling"""
    async with ClientSession(SERVER_URL) as session:
        try:
            result = await session.call_tool("assistant", {
                "resource_id": "invalid-resource-id",
                "query": "test query"
            })
            
            response_data = json.loads(result.content[0].text)
            
            if "error" in response_data:
                print(f"Error occurred: {response_data['error']}")
                print(f"Message: {response_data.get('message', 'No message')}")
            else:
                print("Success:", response_data.get("answer", "No answer"))
                
        except Exception as e:
            print(f"Exception occurred: {type(e).__name__}: {e}")

await error_handling_example()


## Summary

This notebook demonstrated the key features of KB-Bridge:

- **Assistant Tool** - Intelligent Q&A with custom instructions and query rewriting  
- **File Discovery** - Find relevant files before querying  
- **Document Filtering** - Search within specific documents  
- **Keyword Generation** - Improve search queries with LLM-generated keywords  
- **Multiple Search Methods** - Hybrid, semantic, keyword, and full-text search  
- **Complete Workflows** - Combine tools for comprehensive knowledge extraction  

## Use Cases

- **Contract Analysis**: Analyze legal documents (see [ContractNLI](https://stanfordnlp.github.io/contract-nli/))
- **Document Q&A**: Answer questions about technical documentation and policies
- **Knowledge Base Search**: Search across large document collections

## Next Steps

- Explore the [full documentation](https://github.com/egpivo/kb-bridge)
- Check out the [test suite](https://github.com/egpivo/kb-bridge/tree/main/tests) for more examples
- Contribute on [GitHub](https://github.com/egpivo/kb-bridge)

## Resources

- **Installation**: `pip install kbbridge`
- **Server**: `python -m kbbridge.server`
- **Documentation**: See README.md in the repository
- **Issues**: [GitHub Issues](https://github.com/egpivo/kb-bridge/issues)

---

*Happy knowledge base searching!*
