# KB-Bridge Demo

KB-Bridge is an MCP server for intelligent knowledge base search with semantic search, query rewriting, and quality evaluation.

## Quick Start

1. Install: `pip install kbbridge fastmcp`
2. Configure: Create `.env` with backend credentials
3. Start server: Run `ensure_server_running()` in the next cell
4. Update `RESOURCE_ID` with your knowledge base ID

## Notebook Structure

**Setup** (Cells 1-5)
- Server management
- MCP client setup

**Quick Examples** (Cells 6-10)
- ContractNLI examples

**Core Tools** (Cells 11-35)
- `assistant` - Q&A with answers
- `file_discover` - Find relevant files
- `file_lister` / `file_count` - List files
- `keyword_generator` - Generate search keywords
- `retriever` - Low-level search access

**Advanced** (Cells 36-37)
- Complete workflow with visualization
- Error handling

## Links

- [GitHub](https://github.com/egpivo/kb-bridge) | [PyPI](https://pypi.org/project/kbbridge/)


In [None]:
# Import server management utilities
import sys
from pathlib import Path

# Add examples directory to path
cwd = Path.cwd()
if (cwd / 'examples' / 'utils.py').exists():
    sys.path.insert(0, str(cwd / 'examples'))
elif (cwd.parent / 'examples' / 'utils.py').exists():
    sys.path.insert(0, str(cwd.parent / 'examples'))

try:
    from utils import start_server, stop_server, show_logs, check_server_status, logs
    
    def ensure_server_running(port=5566):
        """Start server if not running"""
        if not check_server_status():
            start_server(port=port, kill_existing=True)
        else:
            print("Server already running")
    
    print("‚úì Server utilities loaded")
    print("Use: start_server(port=5566) or ensure_server_running()")
except ImportError:
    print("‚ö† Utils not available. Start server manually: python -m kbbridge.server --port 5566")


## Server Management

Start the server before running examples.


In [None]:
# Start server if not running
ensure_server_running(port=5566)


In [None]:
show_logs()

In [None]:
# Import MCP client and required libraries
import sys
from pathlib import Path
import json

# Add examples directory to path to import mcp_client
cwd = Path.cwd()
if (cwd / 'examples' / 'mcp_client.py').exists():
    sys.path.insert(0, str(cwd / 'examples'))
elif (cwd.parent / 'examples' / 'mcp_client.py').exists():
    sys.path.insert(0, str(cwd.parent / 'examples'))

try:
    from mcp_client import ClientSession
    print("‚úì MCP Client loaded")
except ImportError:
    print("‚ö† mcp_client.py not found. Install httpx: pip install httpx")
    raise

# Server configuration
SERVER_URL = "http://localhost:5566/mcp"
RESOURCE_ID = "bfa61dd2-3514-4768-9014-e30eecdaf00f"  # Replace with your actual resource ID

print(f"‚úì Server URL: {SERVER_URL}")
print(f"‚úì Resource ID: {RESOURCE_ID} (update this with your actual resource ID)")


## Quick Examples: ContractNLI

Examples from the [ContractNLI dataset](https://stanfordnlp.github.io/contract-nli/).

**Note**: Update `RESOURCE_ID` above with your knowledge base ID.


### Example 1: Non-Compete Clause


In [None]:
async def example_non_compete_clause():
    """Query about non-compete clause restrictions"""
    query = "Does the agreement include a non-compete clause restricting the employee from joining competitors?"
    
    custom_instructions = """
    Extract: time periods (e.g., "12 months"), geographic scope (e.g., "50-mile radius"), 
    scope of restriction, and any exceptions. Cite exact text from the document.
    """
    
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("assistant", {
            "resource_id": RESOURCE_ID,
            "query": query,
            "custom_instructions": custom_instructions
        })
        
        response_data = json.loads(result.content[0].text)
        
        if "answer" in response_data:
            print("Answer:", response_data["answer"])
            if "sources" in response_data:
                print(f"\nSources ({len(response_data['sources'])}):")
                for source in response_data["sources"][:3]:
                    print(f"  - {source.get('title', 'Unknown')} (score: {source.get('score', 0):.3f})")
        else:
            print("Error:", response_data.get("error", "Unknown error"))

await example_non_compete_clause()


### Example 2: Termination Notice


In [None]:
async def example_termination_notice():
    """Query about termination notice requirements with query rewriting"""
    query = "What is the notice period required for contract termination by either party?"
    
    async with ClientSession(SERVER_URL) as session:
        # Query with rewriting enabled for better search results
        result = await session.call_tool("assistant", {
            "resource_id": RESOURCE_ID,
            "query": query,
            "enable_query_rewriting": True,
            "custom_instructions": "Extract exact notice period (duration, method, effective date). Cite contract language."
        })
        
        response_data = json.loads(result.content[0].text)
        
        if "answer" in response_data:
            print("Answer:", response_data["answer"])
            if "sources" in response_data:
                print(f"\nTop Sources:")
                for source in response_data["sources"][:3]:
                    print(f"  - {source.get('title', 'Unknown')} (score: {source.get('score', 0):.3f})")
        else:
            print("Error:", response_data.get("error", "Unknown error"))

await example_termination_notice()


## 1. Basic Query


In [None]:
async def basic_query_example():
    """Basic query example"""
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("assistant", {
            "resource_id": RESOURCE_ID,
            "query": "What are the safety protocols?"
        })
        
        # Parse the JSON response
        response_data = json.loads(result.content[0].text)
        print("Answer:", response_data.get("answer", "No answer found"))
        
        # Display sources if available
        if "sources" in response_data:
            print("\nSources:")
            for source in response_data["sources"][:5]:  # Show first 5 sources
                print(f"  - {source.get('title', 'Unknown')} (score: {source.get('score', 0):.3f})")

# Run the example
await basic_query_example()


## 2. Custom Instructions

Provide domain-specific guidance for answer extraction (useful for legal documents, contracts, etc.).

In [None]:
async def custom_instructions_example():
    """Example with custom instructions"""
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("assistant", {
            "resource_id": RESOURCE_ID,
            "query": "What is the maternity leave policy?",
            "custom_instructions": "Focus on HR compliance and legal requirements. Cite specific articles or sections."
        })
        
        response_data = json.loads(result.content[0].text)
        print("Answer:", response_data.get("answer", "No answer found"))

await custom_instructions_example()


## 3. Query Rewriting

Enable LLM-based query expansion for better search results.


In [None]:
async def query_rewriting_example():
    """Example with query rewriting enabled"""
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("assistant", {
            "resource_id": RESOURCE_ID,
            "query": "safety rules",
            "enable_query_rewriting": True  # Enables LLM-based query expansion/relaxation
        })
        
        response_data = json.loads(result.content[0].text)
        print("Answer:", response_data.get("answer", "No answer found"))

await query_rewriting_example()


## 4. Document Filtering

Limit search to a specific document using `document_name`.


In [None]:
async def document_filtering_example():
    """Example with document filtering"""
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("assistant", {
            "resource_id": RESOURCE_ID,
            "query": "What are the safety protocols?",
            "document_name": "safety_manual.pdf"  # Limit search to specific document
        })
        
        response_data = json.loads(result.content[0].text)
        print("Answer:", response_data.get("answer", "No answer found"))

await document_filtering_example()


## 5. File Discovery

Find relevant files before querying, then search specific files.


In [None]:
async def file_discovery_example():
    """Discover relevant files for a query"""
    async with ClientSession(SERVER_URL) as session:
        # Step 1: Discover relevant files
        result = await session.call_tool("file_discover", {
            "query": "employment policies",
            "resource_id": RESOURCE_ID,
            "top_k_recall": 100,  # Number of documents to retrieve
            "top_k_return": 20,   # Number of files to return
            "do_file_rerank": True,  # Enable reranking if available
            "relevance_score_threshold": 0.0
        })
        
        response_data = json.loads(result.content[0].text)
        
        if response_data.get("success"):
            files = response_data.get("distinct_files", [])
            print(f"Found {len(files)} relevant files:")
            for file in files[:10]:  # Show first 10 files
                print(f"  - {file}")
            
            # Step 2: Query a specific file
            if files:
                print(f"\nQuerying specific file: {files[0]}")
                answer_result = await session.call_tool("assistant", {
                    "resource_id": RESOURCE_ID,
                    "query": "What are the vacation policies?",
                    "document_name": files[0]  # Use file from discovery
                })
                
                answer_data = json.loads(answer_result.content[0].text)
                print("Answer:", answer_data.get("answer", "No answer found"))
        else:
            print("File discovery failed:", response_data.get("error", "Unknown error"))

await file_discovery_example()


## 6. List Files

List all files in your knowledge base (with pagination).


In [None]:
async def file_lister_example():
    """List files in a knowledge base"""
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("file_lister", {
            "resource_id": RESOURCE_ID,
            "timeout": 30,
            "limit": 50,  # Limit number of files returned
            "offset": 0   # Pagination offset
        })
        
        response_data = json.loads(result.content[0].text)
        
        if "files" in response_data:
            files = response_data["files"]
            print(f"Total files: {len(files)}")
            print("\nFiles:")
            # Files are returned as a list of strings (file names)
            for file_name in files[:20]:  # Show first 20 files
                # Handle both string and dict formats for compatibility
                if isinstance(file_name, str):
                    print(f"  - {file_name}")
                elif isinstance(file_name, dict):
                    name = file_name.get("name", "Unknown")
                    size = file_name.get("size", 0)
                    print(f"  - {name} ({size} bytes)")
                else:
                    print(f"  - {file_name}")
        else:
            print("Error:", response_data.get("error", "Unknown error"))

await file_lister_example()


## 7. File Count

Get the total number of files in your knowledge base.


In [None]:
async def file_count_example():
    """Get file count in knowledge base"""
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("file_count", {
            "resource_id": RESOURCE_ID
        })
        
        response_data = json.loads(result.content[0].text)
        print(f"File count: {response_data.get('file_count', 0)}")
        print(f"Has files: {response_data.get('has_files', False)}")

await file_count_example()


## 8. Keyword Generation

Generate search keywords using LLM to improve queries.


In [None]:
async def keyword_generator_example():
    """Generate keywords for a query"""
    async with ClientSession(SERVER_URL) as session:
        result = await session.call_tool("keyword_generator", {
            "query": "employee benefits and compensation",
            "max_sets": 5  # Number of keyword sets to generate
        })
        
        response_data = json.loads(result.content[0].text)
        
        if "keyword_sets" in response_data:
            print("Generated keyword sets:")
            for i, keyword_set in enumerate(response_data["keyword_sets"], 1):
                print(f"\nSet {i}:")
                for keyword in keyword_set:
                    print(f"  - {keyword}")
        else:
            print("Error:", response_data.get("error", "Unknown error"))

await keyword_generator_example()


## 9. Retriever Tool

Low-level access to raw search results (no answer synthesis).

**When to use:**
- `assistant` ‚Üí Synthesized answers with citations
- `retriever` ‚Üí Raw search chunks for custom processing


In [None]:
async def retriever_example():
    """Use retriever tool for raw search results (no answer synthesis)."""
    async with ClientSession(SERVER_URL) as session:
        # Example: Hybrid search (semantic + keyword)
        # Returns raw chunks: {"result": [{"content": "...", "document_name": "..."}]}
        result = await session.call_tool("retriever", {
            "resource_id": RESOURCE_ID,
            "query": "What obligations remain in effect after the NDA expires, specifically regarding return or destruction of confidential information and survival of obligations for clinical trial data at University of Michigan?",
            "search_method": "hybrid_search",  # Options: "hybrid_search", "semantic_search", "keyword_search", "full_text_search", "vector_search"
            "does_rerank": True,  # Enable reranking if available
            "top_k": 10,  # Number of results to return
            "verbose": False
        })
        
        response_data = json.loads(result.content[0].text)
        
        # The retriever returns {"result": [{"content": "...", "document_name": "..."}]}
        if "result" in response_data:
            results = response_data["result"]
            print(f"Retrieved {len(results)} results:")
            for i, item in enumerate(results[:5], 1):  # Show first 5
                print(f"\nResult {i}:")
                document_name = item.get("document_name", "Unknown")
                content = item.get("content", "")
                print(f"  Document: {document_name}")
                print(f"  Content preview: {content[:200]}..." if len(content) > 200 else f"  Content: {content}")
        elif "error" in response_data:
            print(f"Error: {response_data['error']}")
            # Show format_error if available (from formatting issues)
            if "format_error" in response_data:
                print(f"Format error: {response_data['format_error']}")
        else:
            print("Error: Unknown error")
            print(f"Response keys: {list(response_data.keys())}")
            print(f"Response: {json.dumps(response_data, indent=2)[:500]}")

await retriever_example()


## 10. Complete Workflow

End-to-end example: keyword generation ‚Üí file discovery ‚Üí answer extraction ‚Üí visualization.


In [None]:
# Import visualization libraries
try:
    import matplotlib.pyplot as plt
    from matplotlib.patches import FancyBboxPatch
    MATPLOTLIB_AVAILABLE = True
except ImportError:
    MATPLOTLIB_AVAILABLE = False
    print("‚ö†Ô∏è  matplotlib not available. Install with: pip install matplotlib")
    print("   Visualization will be skipped, but workflow will still run.")


def _is_valid_answer(answer_text):
    """Check if answer text is valid and meaningful."""
    if not answer_text or not answer_text.strip():
        return False
    upper_text = answer_text.strip().upper()
    return upper_text not in ['N/A', 'N/A - NO RELEVANT INFORMATION FOUND', '']


def _get_sources_count(answer_data):
    """Extract sources count from answer data, checking multiple locations."""
    # Try total_sources first
    sources_count = answer_data.get('total_sources', 0)
    if sources_count > 0:
        return sources_count
    
    # Check reflection metadata
    reflection = answer_data.get('reflection', {})
    if isinstance(reflection, dict):
        sources_count = reflection.get('sources_count', 0)
        if sources_count > 0:
            return sources_count
        
        # If quality_score exists, sources were evaluated
        if reflection.get('quality_score', 0) > 0:
            # Try structured_answer
            structured_answer = answer_data.get('structured_answer', {})
            if isinstance(structured_answer, dict):
                structured_sources = structured_answer.get('sources', [])
                if structured_sources:
                    return len(structured_sources)
            # At least one source was used if quality_score exists
            return 1
    
    # If answer exists, at least one source was likely used
    answer_text = answer_data.get('answer', '') or answer_data.get('text_summary', '')
    if _is_valid_answer(answer_text):
        return 1
    
    return 0


def _check_advanced_search_success(answer_data):
    """Determine if advanced search was successful."""
    total_sources = answer_data.get('total_sources', 0)
    reflection = answer_data.get('reflection', {})
    
    if total_sources > 0:
        return True
    
    if isinstance(reflection, dict):
        # If quality_score exists, sources were evaluated
        if reflection.get('quality_score', 0) > 0:
            return True
        # Check explicit sources_count
        if reflection.get('sources_count', 0) > 0:
            return True
    
    # If answer exists, at least one search method succeeded
    answer_text = answer_data.get('answer', '') or answer_data.get('text_summary', '')
    return _is_valid_answer(answer_text)


def visualize_workflow_flow(answer_data, file_data=None, keyword_data=None):
    """
    Visualize the KB-Bridge workflow as a flowchart showing information flow.
    
    Shows:
    - Query input
    - Keyword generation (if used)
    - File discovery
    - Direct search path
    - Advanced search path
    - Answer extraction
    - Reflection/evaluation
    - Final answer output
    
    Args:
        answer_data: Response data from assistant tool
        file_data: Optional file discovery results
        keyword_data: Optional keyword generation results
    """
    if not MATPLOTLIB_AVAILABLE:
        raise ImportError("matplotlib is required for visualization")
    
    fig, ax = plt.subplots(1, 1, figsize=(14, 10))
    ax.set_xlim(0, 10)
    ax.set_ylim(0, 12)
    ax.axis('off')
    
    # Color scheme
    colors = {
        'query': '#4A90E2',
        'process': '#7ED321',
        'search': '#F5A623',
        'success': '#50E3C2',
        'fail': '#D0021B',
        'output': '#9013FE'
    }
    
    y_pos = 11
    x_center = 5
    
    # Helper function to draw boxes
    def draw_box(x, y, width, height, color, text, fontsize=10, bold=False):
        box = FancyBboxPatch((x-width/2, y-height/2), width, height,
                             boxstyle="round,pad=0.1", facecolor=color,
                             edgecolor='black', linewidth=1.5)
        ax.add_patch(box)
        weight = 'bold' if bold else 'normal'
        ax.text(x, y, text, ha='center', va='center',
                fontsize=fontsize, fontweight=weight, color='white' if color == colors['output'] else 'black')
    
    def draw_arrow(x, y, dx, dy, style='-'):
        ax.arrow(x, y, dx, dy, head_width=0.15, head_length=0.08,
                 fc='black', ec='black', linestyle=style)
    
    # 1. Query Input
    draw_box(x_center, y_pos, 4, 0.8, colors['query'], 'User Query', fontsize=12, bold=True)
    y_pos -= 1.5
    draw_arrow(x_center, y_pos + 0.3, 0, -0.3)
    y_pos -= 0.5
    
    # 2. Keyword Generation (if used)
    if keyword_data and keyword_data.get('keyword_sets'):
        kw_count = len(keyword_data.get('keyword_sets', []))
        draw_box(x_center, y_pos, 4, 0.7, colors['process'],
                f'Keyword Generation\n({kw_count} sets)', fontsize=10, bold=True)
        y_pos -= 1.2
        draw_arrow(x_center, y_pos + 0.2, 0, -0.2)
        y_pos -= 0.3
    
    # 3. File Discovery
    if file_data:
        file_count = len(file_data.get('distinct_files', []))
        draw_box(x_center, y_pos, 4, 0.7, colors['process'],
                f'File Discovery\n({file_count} files found)', fontsize=10, bold=True)
        y_pos -= 1.2
        draw_arrow(x_center, y_pos + 0.2, 0, -0.2)
        y_pos -= 0.3
    
    # 4. Search Approaches (Parallel)
    y_search = y_pos - 0.5
    
    # Direct Search
    answer_text = answer_data.get('answer', '') or answer_data.get('text_summary', '')
    direct_success = _is_valid_answer(answer_text)
    direct_color = colors['success'] if direct_success else colors['fail']
    direct_status = "[OK] Success" if direct_success else "[FAIL] No Results"
    draw_box(2.5, y_search, 3, 1, direct_color,
            f'Direct Search\n{direct_status}', fontsize=9, bold=True)
    
    # Advanced Search
    has_advanced = _check_advanced_search_success(answer_data)
    advanced_color = colors['success'] if has_advanced else colors['fail']
    advanced_status = "[OK] Success" if has_advanced else "[PARTIAL] Failed"
    draw_box(7.5, y_search, 3, 1, advanced_color,
            f'Advanced Search\n{advanced_status}', fontsize=9, bold=True)
    
    # Arrows from file discovery to both searches
    draw_arrow(x_center, y_pos + 0.2, -1.5, -0.8, style='--')
    draw_arrow(x_center, y_pos + 0.2, 1.5, -0.8, style='--')
    
    y_pos = y_search - 1.5
    
    # 5. Answer Extraction & Synthesis
    sources_count = _get_sources_count(answer_data)
    draw_box(x_center, y_pos, 5, 0.7, colors['search'],
            f'Answer Extraction & Synthesis\n({sources_count} sources)',
            fontsize=10, bold=True)
    y_pos -= 1.2
    draw_arrow(x_center, y_pos + 0.2, 0, -0.2)
    y_pos -= 0.3
    
    # 6. Reflection/Quality Evaluation (if enabled)
    if 'reflection' in answer_data:
        reflection = answer_data.get('reflection', {})
        quality_score = reflection.get('quality_score', 0)
        passed = reflection.get('passed', False)
        ref_color = colors['success'] if passed else colors['fail']
        status_text = "[PASS]" if passed else "[FAIL]"
        draw_box(x_center, y_pos, 5, 0.7, ref_color,
                f'{status_text} Reflection & Quality Check\n(Score: {quality_score:.2f})',
                fontsize=10, bold=True)
        y_pos -= 1.2
        draw_arrow(x_center, y_pos + 0.2, 0, -0.2)
        y_pos -= 0.3
    
    # 7. Final Answer Output
    if not answer_text or answer_text.strip().upper() in ['N/A', 'N/A - NO RELEVANT INFORMATION FOUND']:
        answer_text = 'No answer found'
    answer_preview = answer_text[:50] + "..." if len(answer_text) > 50 else answer_text
    draw_box(x_center, y_pos, 5, 0.8, colors['output'],
            f'Final Answer\n"{answer_preview}"', fontsize=10, bold=True)
    
    # Title
    ax.text(x_center, 11.8, 'KB-Bridge Information Flow',
            ha='center', va='center', fontsize=16, fontweight='bold')
    
    plt.tight_layout()
    return fig

def _display_reflection_analysis(reflection, answer):
    """Display detailed reflection analysis when quality is low."""
    quality_score = reflection.get("quality_score")
    threshold = reflection.get("threshold", 0.7)
    passed = reflection.get("passed", True)
    
    # Determine analysis type
    if answer == "N/A - No relevant information found":
        title = "üîç Reflection Analysis: Why No Results Were Found"
    else:
        title = "üîç Reflection Analysis: Why Quality Score is Low"
    
    print("\n" + "‚îÄ" * 60)
    print(title)
    print("‚îÄ" * 60)
    
    if quality_score is not None:
        print(f"\nüìä Quality Assessment:")
        print(f"   Quality Score: {quality_score:.2f} / {threshold:.2f}")
        print(f"   Status: {'‚úÖ Passed' if passed else '‚ùå Below threshold'}")
        
        # Show confidence level interpretation
        confidence_level = reflection.get("confidence_level", "")
        if confidence_level:
            confidence_map = {
                "high": "‚úÖ High confidence - answer is reliable",
                "medium": "‚ö†Ô∏è  Medium confidence - answer may need verification",
                "low": "‚ö†Ô∏è  Low confidence - answer quality is below acceptable threshold",
                "very_low": "‚ùå Very low confidence - answer is likely incorrect or incomplete"
            }
            print(f"   Confidence: {confidence_map.get(confidence_level, confidence_level)}")
    
    # Show detailed feedback
    feedback = reflection.get("feedback", "")
    if feedback:
        print(f"\nüí° Reflection Feedback:")
        print(f"   {feedback}")
    
    # Show detailed scores breakdown
    scores = reflection.get("scores", {})
    if scores:
        print(f"\nüìà Detailed Quality Scores:")
        for metric, score in scores.items():
            print(f"   {metric.capitalize()}: {score:.2f}")
    
    # Show recommendations if available
    recommendation = reflection.get("recommendation", "")
    if recommendation:
        print(f"\nüí° Recommendation:")
        print(f"   {recommendation}")
    
    # Show re-extraction suggestion if available
    if reflection.get("re_extraction_recommended"):
        candidates_count = reflection.get("re_extraction_candidates", 0)
        print(f"\nüîÑ Re-extraction Opportunity:")
        print(f"   {candidates_count} candidates have segments but extraction failed.")
        print(f"   Re-extraction with improved parameters may help.")


async def complete_workflow_example(reflection_threshold=0.5):
    """
    Complete workflow: keyword generation ‚Üí file discovery ‚Üí answer extraction ‚Üí visualization.
    
    Args:
        reflection_threshold: Quality threshold (0-1). Default 0.5. Lower = more lenient.
    """
    async with ClientSession(SERVER_URL) as session:
        query = "What obligations remain in effect after the NDA expires, specifically regarding return or destruction of confidential information and survival of obligations for clinical trial data at University of Michigan?"
        
        print("=" * 60)
        print("üîÑ Complete Workflow Example")
        print(f"   Reflection Threshold: {reflection_threshold}")
        print("=" * 60)
        
        # Step 1: Generate keywords
        print("\n1Ô∏è‚É£ Generating keywords...")
        keyword_result = await session.call_tool("keyword_generator", {
            "query": query,
            "max_sets": 3
        })
        keyword_data = json.loads(keyword_result.content[0].text)
        if "keyword_sets" in keyword_data:
            print(f"   ‚úÖ Generated {len(keyword_data['keyword_sets'])} keyword sets")
        
        # Step 2: Discover relevant files
        print("\n2Ô∏è‚É£ Discovering relevant files...")
        file_result = await session.call_tool("file_discover", {
            "query": query,
            "resource_id": RESOURCE_ID,
            "top_k_return": 10
        })
        file_data = json.loads(file_result.content[0].text)
        files = file_data.get("distinct_files", [])
        print(f"   ‚úÖ Found {len(files)} relevant files")
        
        # Step 3: Query assistant with reflection enabled
        print("\n3Ô∏è‚É£ Querying assistant (with reflection and verbose mode)...")
        answer_result = await session.call_tool("assistant", {
            "resource_id": RESOURCE_ID,
            "query": query,
            "custom_instructions": "Provide a comprehensive answer with specific details. Cite sources.",
            "enable_query_rewriting": True,
            "enable_reflection": True,
            "reflection_threshold": reflection_threshold,
            "verbose": True
        })
        answer_data = json.loads(answer_result.content[0].text)
        
        # Display final answer
        print("\n" + "=" * 60)
        print("üì§ Final Answer")
        print("=" * 60)
        answer = answer_data.get("answer", answer_data.get("text_summary", "No answer found"))
        print(answer)
        
        # Show reflection insights when quality is low or answer indicates no results
        if "reflection" in answer_data:
            reflection = answer_data.get("reflection", {})
            quality_score = reflection.get("quality_score")
            passed = reflection.get("passed", True)
            
            # Show analysis if quality is low or answer indicates no results
            # Use reflection_threshold instead of hardcoded 0.7
            threshold = reflection.get("threshold", reflection_threshold)
            if not passed or (quality_score is not None and quality_score < threshold) or answer == "N/A - No relevant information found":
                _display_reflection_analysis(reflection, answer)
        
        # Show sources if available
        if "sources" in answer_data and answer_data["sources"]:
            print(f"\nüìö Sources ({len(answer_data['sources'])}):")
            for source in answer_data["sources"][:5]:
                print(f"   ‚Ä¢ {source.get('title', 'Unknown')}")
        
        # Visualize the workflow flow
        print("\n" + "=" * 60)
        print("üìä Workflow Visualization")
        print("=" * 60)
        print("\nGenerating information flow diagram...")
        print("\nüí° Note: Advanced search may show as 'Failed' even when direct search succeeds.")
        print("   This happens because:")
        print("   - Advanced search processes files individually with per-file top_k limits")
        print("   - Segments might rank differently within files vs. across all files")
        print("   - Direct search aggregates results across all files (more forgiving)")
        print("   - KB-Bridge automatically uses the best results from either approach")
        
        # Generate visualization if matplotlib is available
        if MATPLOTLIB_AVAILABLE:
            try:
                fig = visualize_workflow_flow(answer_data, file_data, keyword_data)
                plt.show()
                print("\n‚úÖ Workflow diagram displayed above")
            except Exception as e:
                print(f"\n‚ö†Ô∏è  Could not generate visualization: {e}")
                print("   (This is optional - the workflow still completed successfully)")
        else:
            print("\n‚ö†Ô∏è  Visualization skipped (matplotlib not installed)")
            print("   Install with: pip install matplotlib")
            print("\nüìã Text Summary of Workflow:")
            print("   1. User Query ‚Üí Keyword Generation ‚Üí File Discovery")
            print("   2. Parallel Search: Direct Search + Advanced Search")
            print("   3. Answer Extraction & Synthesis")
            if 'reflection' in answer_data:
                print("   4. Reflection & Quality Check")
            print("   5. Final Answer Output")

# Run with lower threshold (0.5) so scores like 0.51 will pass and show green
await complete_workflow_example()

# To use stricter threshold, uncomment:
# await complete_workflow_example(reflection_threshold=0.7)


In [None]:
show_logs()

## 11. Error Handling

Handle errors gracefully when working with the API.


In [None]:
async def error_handling_example():
    """Example of error handling"""
    async with ClientSession(SERVER_URL) as session:
        try:
            result = await session.call_tool("assistant", {
                "resource_id": "invalid-resource-id",
                "query": "test query"
            })
            
            response_data = json.loads(result.content[0].text)
            
            if "error" in response_data:
                print(f"Error occurred: {response_data['error']}")
                print(f"Message: {response_data.get('message', 'No message')}")
            else:
                print("Success:", response_data.get("answer", "No answer"))
                
        except Exception as e:
            print(f"Exception occurred: {type(e).__name__}: {e}")

await error_handling_example()
