# CollabSearch Framework
## Comprehensive Code Structure and Implementation Guide

## Executive Summary

The CollabSearch (Collaborative Analysis) Framework represents a sophisticated multi-agent system that demonstrates advanced patterns in AI orchestration, intent disambiguation, and collaborative reasoning. The system combines retrieval-augmented generation (RAG), dynamic role-based prompting, and multi-stage pipeline processing to deliver contextually appropriate responses through intelligent routing and synthesis mechanisms.


##Install necessary libraries

In [None]:
!pip install openai>=1.0.0 geotext transformers GeoText
!pip install -q -U google-genai
!pip install langchain openai google-api-python-client langchain_community tools langchain_google_community langchain_openai
!pip install --upgrade openai
!pip install gradio openpyxl

[0m

###Import Libraries

In [None]:
import os
import re
import logging
import csv
import time
import json
import gradio as gr
import pandas as pd
from datetime import datetime
from openai import OpenAI
from langchain import GoogleSearchAPIWrapper
from langchain_google_community import GoogleSearchAPIWrapper
from langchain_openai import ChatOpenAI
from langchain.agents import Tool, AgentExecutor, create_tool_calling_agent
from langchain.prompts import ChatPromptTemplate

Set Environmental Variables



In [None]:
os.environ["IDA_LLM_API_KEY"] = "your_api_key_here"
os.environ["GOOGLE_API_KEY"] = "your_google_api_key_here"
os.environ["GOOGLE_CSE_ID"] = "your_cse_id_here"

In [None]:
# Initialize your private LLM client (llama-3-8b-instruct-instruct via university server)
client = OpenAI(
    base_url="http://api.llm.apps.os.dcs.gla.ac.uk/v1",
    api_key=os.environ['IDA_LLM_API_KEY']
)

## Database Management and State Persistence

### Comprehensive Schema Design

**Database Architecture Insights:**
1. **Comprehensive Tracking**: Every processing stage logged
2. **Performance Monitoring**: Separate timing fields
3. **Source Attribution**: RAG source URLs preserved
4. **Process Lineage**: Complete transformation trail

In [None]:
def initialize_database_with_sources():
    '''
    Initialize CSV database for COLA system tracking with predefined schema.
    Creates empty database file with columns for query processing, routing, analysis results, and performance metrics.
    '''
    try:
        if not os.path.exists('cola_database.csv'):
            columns = [
                'ID', 'Original_Query', 'Rewritten_Query', 'Selected_Topic_Intent', 'Selected_Answer_Type',
                'Routing_Type', 'Routing_Method', 'Dictionary_Match', 'Identified_Topic', 'Assigned_Roles',
                'Linguist_Analysis', 'Expert_Analysis', 'User_Analysis', 'In_Favor', 'Against',
                'Final_Judgement', 'Synthesis_Method', 'After_RAG_Agent', 'RAG_Source_URLs',
                'Processing_Time_Seconds', 'Processing_Time_Seconds_RAG', 'Timestamp', 'Status'
            ]
            df = pd.DataFrame(columns=columns)

            # Comprehensive dtype mapping
            dtype_dict = {
                'ID': 'int64',
                'Original_Query': 'string',
                'Rewritten_Query': 'string',
                'Selected_Topic_Intent': 'string',
                'Selected_Answer_Type': 'string',
                'Routing_Type': 'string',
                'Routing_Method': 'string',
                'Dictionary_Match': 'string',
                'Identified_Topic': 'string',
                'Assigned_Roles': 'string',
                'Linguist_Analysis': 'string',
                'Expert_Analysis': 'string',
                'User_Analysis': 'string',
                'In_Favor': 'string',
                'Against': 'string',
                'Final_Judgement': 'string',
                'Synthesis_Method': 'string',
                'After_RAG_Agent': 'string',
                'RAG_Source_URLs': 'string',
                'Processing_Time_Seconds': 'float64',
                'Processing_Time_Seconds_RAG': 'float64',
                'Timestamp': 'string',
                'Status': 'string'
            }

            df = df.astype(dtype_dict)
            df.to_csv('cola_database.csv', index=False)
            print("✅ Enhanced COLA database with comprehensive tracking initialized")
        else:
            print("✅ Database already exists")
    except Exception as e:
        print(f"❌ Error initializing database: {e}")
        # Create minimal fallback
        pd.DataFrame(columns=['ID', 'Original_Query', 'Status']).to_csv('cola_database.csv', index=False)

In [None]:
def add_new_query(query_id, query_text):

    try:
        if os.path.exists('cola_database.csv'):
            df = pd.read_csv('cola_database.csv')
        else:
            initialize_database_with_sources()
            df = pd.read_csv('cola_database.csv')

        # FIXED: Create new row data that matches DataFrame structure exactly
        new_row_data = {
            'ID': int(query_id),
            'Original_Query': str(query_text),
            'Rewritten_Query': '',
            'Selected_Topic_Intent': '',
            'Selected_Answer_Type': '',
            'Routing_Type': '',
            'Routing_Method': '',
            'Dictionary_Match': '',
            'Identified_Topic': '',
            'Assigned_Roles': '',
            'Linguist_Analysis': '',
            'Expert_Analysis': '',
            'User_Analysis': '',
            'In_Favor': '',
            'Against': '',
            'Final_Judgement': '',
            'Synthesis_Method': '',
            'After_RAG_Agent': '',
            'RAG_Source_URLs': '',
            'Processing_Time_Seconds': None,
            'Processing_Time_Seconds_RAG': None,
            'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'Status': 'pending'
        }

        if not df.empty:
            # Make sure new row has exact same columns as existing DataFrame
            for col in df.columns:
                if col not in new_row_data:
                    new_row_data[col] = None if df[col].dtype in ['float64', 'int64'] else ''

            # Create new row with matching column order
            new_row = pd.DataFrame([new_row_data], columns=df.columns)
        else:
            new_row = pd.DataFrame([new_row_data])

        df = pd.concat([df, new_row], ignore_index=True, sort=False)
        df.to_csv('cola_database.csv', index=False)
        print(f"✅ Added query {query_id} to database")

    except Exception as e:
        print(f"❌ Error adding query to database: {e}")
        import traceback
        traceback.print_exc()

In [None]:
def update_comprehensive_results(query_id, results_dict):
    '''
    Add new query entry to CSV database with initial default values.
    Creates new row with query ID and text, initializes empty fields for tracking processing stages.
    '''
    try:
        df = pd.read_csv('cola_database.csv')
        mask = df['ID'] == query_id

        if mask.any():
            for column, value in results_dict.items():
                if column in df.columns:
                    if column in ['Processing_Time_Seconds', 'Processing_Time_Seconds_RAG']:
                        try:
                            df.loc[mask, column] = float(value) if value is not None else None
                        except (ValueError, TypeError):
                            df.loc[mask, column] = None
                    else:
                        df.loc[mask, column] = str(value) if value is not None else ''

            df.loc[mask, 'Status'] = 'completed'
            df.to_csv('cola_database.csv', index=False)
            print(f"✅ Updated comprehensive results for query {query_id}")
        else:
            print(f"⚠️ Query ID {query_id} not found in database")

    except Exception as e:
        print(f"❌ Error updating results: {e}")

In [None]:
def view_comprehensive_database_stats():
    '''
    Display comprehensive database statistics and analysis.
    Provides detailed breakdown of query status, routing patterns, processing times, topic distribution, and recent entries.
    '''
    try:
        df = pd.read_csv('cola_database.csv')

        total_queries = len(df)
        completed = len(df[df['Status'] == 'completed'])
        pending = len(df[df['Status'] == 'pending'])
        errors = len(df[df['Status'] == 'error'])

        # Enhanced routing statistics
        routing_stats = ""
        synthesis_stats = ""
        dictionary_stats = ""

        if 'Routing_Type' in df.columns and completed > 0:
            completed_df = df[df['Status'] == 'completed']

            # Routing type distribution
            if 'Routing_Type' in completed_df.columns:
                routing_dist = completed_df['Routing_Type'].value_counts()
                routing_stats = f"\n📊 Routing Distribution:"
                for route_type, count in routing_dist.items():
                    if pd.notna(route_type):  # Skip NaN values
                        routing_stats += f"\n   - {route_type}: {count}"

            # Routing method breakdown
            if 'Routing_Method' in completed_df.columns:
                method_dist = completed_df['Routing_Method'].value_counts()
                routing_stats += f"\n🔍 Method Breakdown:"
                for method, count in method_dist.items():
                    if pd.notna(method):  # Skip NaN values
                        routing_stats += f"\n   - {method}: {count}"

            # Dictionary match analysis
            if 'Dictionary_Match' in completed_df.columns:
                dict_matches = completed_df['Dictionary_Match'].value_counts()
                dictionary_stats = f"\n🔑 Dictionary Matches:"
                for match, count in dict_matches.items():
                    if pd.notna(match) and match.strip():  # Skip NaN and empty values
                        dictionary_stats += f"\n   - '{match}': {count}"

            # Synthesis method distribution
            if 'Synthesis_Method' in completed_df.columns:
                synthesis_dist = completed_df['Synthesis_Method'].value_counts()
                synthesis_stats = f"\n⚙️ Synthesis Methods:"
                for method, count in synthesis_dist.items():
                    if pd.notna(method):  # Skip NaN values
                        synthesis_stats += f"\n   - {method}: {count}"

        # Processing time analysis
        time_stats = ""
        if 'Processing_Time_Seconds' in df.columns and completed > 0:
            completed_df = df[df['Status'] == 'completed']
            processing_times = completed_df['Processing_Time_Seconds'].dropna()
            if not processing_times.empty:
                avg_time = processing_times.mean()
                min_time = processing_times.min()
                max_time = processing_times.max()
                time_stats = f"\n⏱️ Processing Times:\n   - Average: {avg_time:.2f}s\n   - Minimum: {min_time:.2f}s\n   - Maximum: {max_time:.2f}s"

        # Topic analysis
        topic_stats = ""
        if 'Identified_Topic' in df.columns and completed > 0:
            completed_df = df[df['Status'] == 'completed']
            topic_dist = completed_df['Identified_Topic'].value_counts()
            topic_stats = f"\n🏷️ Topics Identified:"
            for topic, count in topic_dist.head(5).items():  # Top 5 topics
                if pd.notna(topic):
                    topic_stats += f"\n   - {topic}: {count}"

        # Complete stats output with proper formatting
        stats = f"""
📈 COMPREHENSIVE DATABASE STATISTICS
=====================================
📊 Query Status:
  - Total Queries: {total_queries}
  - Completed: {completed}
  - Pending: {pending}
  - Errors: {errors}
{time_stats}{routing_stats}{dictionary_stats}{synthesis_stats}{topic_stats}

🗂️ Database Schema: {len(df.columns)} columns
📋 Available Columns: {', '.join(df.columns)}

📋 Recent Queries (Last 5):
"""

        if not df.empty:
            # Better column selection and handling of NaN values
            display_columns = ['ID', 'Original_Query', 'Routing_Type', 'Synthesis_Method', 'Identified_Topic', 'Status', 'Timestamp']
            existing_columns = [col for col in display_columns if col in df.columns]

            recent = df.tail(5)[existing_columns].copy()

            # Clean up NaN values for display
            for col in recent.columns:
                if col not in ['ID']:  # Don't modify ID column
                    recent[col] = recent[col].fillna('Not Set')

            # Truncate long text for better display
            if 'Original_Query' in recent.columns:
                recent['Original_Query'] = recent['Original_Query'].apply(
                    lambda x: str(x)[:50] + '...' if len(str(x)) > 50 else str(x)
                )

            stats += recent.to_string(index=False, max_colwidth=50)

        # Adds summary at the end
        stats += f"""

📊 SUMMARY:
- Database is functioning properly with {total_queries} total queries
- {completed} queries have been successfully processed
- Enhanced tracking is capturing routing decisions and topic identification
- Latest query processing time: {processing_times.iloc[-1]:.2f}s (if available)
"""

        return stats

    except Exception as e:
        error_details = f"""
  ❌ ERROR READING DATABASE: {str(e)}

  🔧 TROUBLESHOOTING:
  - Check if 'cola_database.csv' exists in the current directory
  - Verify database schema is correct
  - Ensure no file permissions issues

  📋 Available files: {', '.join([f for f in os.listdir('.') if f.endswith('.csv')])}
  """
        return error_details

### Resilient LLM Interface Design

**Key Design Patterns:**
1. **Exponential Backoff Pattern**: Retry logic with delays
2. **Graceful Degradation**: Error messages instead of crashes
3. **Deterministic Processing**: Temperature=0 for consistency
4. **Comprehensive Error Logging**: Tracks failures for monitoring

In [None]:
def get_completion(prompt):
  '''
  Send prompt to Llama-3-8B model with retry logic and error handling.
  Attempts up to 100 retries with 2-second delays, returns model response or "Error" on failure.
  '''
    max_retries = 100

    for i in range(max_retries):
        try:
          messages = [{"role": "user", "content": prompt}]
          response = client.chat.completions.create(
              model="llama-3-8b-instruct",
              messages=messages,
              temperature=0
            )
          return response.choices[0].message.content
        except Exception as e:  # Generic exception handling
            if i < max_retries - 1:
                time.sleep(2)
                logging.warning(f"Attempt {i+1} failed: {e}")
            else:
                logging.error(f'Max retries reached for prompt: {instruction}. Error: {e}')
                return "Error"



In [None]:
def get_completion_with_role(role, instruction, content):
  '''
  Send role-based prompt to Llama-3-8B model with system role assignment.
  Sets AI role via system message, combines instruction and content for user message, includes retry logic on failure.
  '''
    max_retries = 100
    for i in range(max_retries):
      try:

        messages = [
            {"role": "system", "content": f"You are a {role}."},
            {"role": "user", "content": f"{instruction}\n{content}"}
        ]
        response = client.chat.completions.create(
            model="llama-3-8b-instruct",
            messages=messages,
            temperature=0
          )
        return response.choices[0].message.content

      except Exception as e:  # Generic exception handling
            if i < max_retries - 1:
                time.sleep(2)
                logging.warning(f"Attempt {i+1} failed: {e}")
            else:
                logging.error(f'Max retries reached for prompt: {instruction}. Error: {e}')
                return "Error"

## Intent Disambiguation Engine

### Ambiguity Detection Algorithm

**Disambiguation Strategy Analysis:**
1. **Semantic Ambiguity Recognition**: Identifies homonyms and polysemous words
2. **Domain-Level Disambiguation**: Separates different subject areas
3. **Context-Level Refinement**: Provides different perspectives
4. **Structured Output Format**: Enforces consistent formatting

In [None]:
def generate_intent_options(original_query):
    '''
    Generate 3 domain/topic disambiguation options for ambiguous queries.
    Identifies ambiguous terms and provides different interpretations (e.g., Java as programming language vs. island vs. coffee).
    '''
    prompt = f"""Query: "{original_query}"

        CRITICAL: I need you to identify if this query has AMBIGUOUS WORDS that could mean completely different things.

        Look for words that could be:
        - Programming language vs. place vs. other meanings (Java, Python, Ruby, etc.)
        - Company vs. fruit vs. other (Apple, Orange, etc.)
        - Person vs. place vs. concept (Tesla, Darwin, etc.)
        - Multiple different meanings entirely

        If you find ambiguous words, give me 3 DIFFERENT DOMAINS/SUBJECTS.
        If no ambiguous words, give me 3 different CONTEXTS for the same topic, be explanative but brief:
        WRONG: Iphone moldels (Travel)
        RIGHT: Best Iphone Models to Travel
        Exception: If the ambiguous word relates to iPhone, ALWAYS exclude food/fruit.

        WRONG (all same domain):
        - Java web development features
        - Java mobile app development
        - Java enterprise applications

        RIGHT (different domains):
        - Java (programming language)
        - Java (Indonesian island)
        - Java (coffee culture)

        Format: Topic (context)

        Respond with exactly 3 lines, no explanations:"""

    try:
        response = get_completion(prompt)

        # Clean and parse the response - be more aggressive about filtering
        lines = [line.strip() for line in response.strip().split('\n') if line.strip()]

        options = []
        # More comprehensive filtering for instruction text
        skip_phrases = [
            "here are", "results", "the query", "could refer", "examples",
            "format", "write only", "respond with", "provide", "interpretations",
            "different meanings", "ambiguous", "critical"
        ]

        for line in lines:
            # Skip lines that contain instruction-like phrases
            line_lower = line.lower()
            if any(phrase in line_lower for phrase in skip_phrases):
                continue

            # Remove numbering/bullets more aggressively
            cleaned_line = line
            import re
            cleaned_line = re.sub(r'^[0-9]+[\.\)\-\s]+', '', cleaned_line)
            cleaned_line = re.sub(r'^[\-\*\•]\s+', '', cleaned_line)

            # Only keep lines that look like actual topic options (should contain parentheses ideally)
            if cleaned_line and len(cleaned_line) > 3 and not any(phrase in cleaned_line.lower() for phrase in skip_phrases):
                options.append(cleaned_line)

        # Take first 3 options
        if len(options) >= 3:
            return options[:3]

        # If we don't get good results, create manual disambiguation for common terms
        query_lower = original_query.lower()

        # Check for common ambiguous terms
        if 'java' in query_lower:
            return [
                "Java (programming language)",
                "Java (Indonesian island)",
                "Java (coffee culture)"
            ]
        elif 'python' in query_lower:
            return [
                "Python (programming language)",
                "Python (snake species)",
                "Python (Monty Python comedy)"
            ]
        elif 'tesla' in query_lower:
            return [
                "Tesla (car company)",
                "Tesla (Nikola Tesla scientist)",
                "Tesla (band/music)"
            ]
        else:
            # Generic contextual fallback
            return [
                f"Technical/professional information about {original_query}",
                f"General educational information about {original_query}",
                f"Practical applications of {original_query}"
            ]

    except Exception as e:
        print(f"Error generating intent options: {e}")
        return [
            f"Technical information about {original_query}",
            f"General information about {original_query}",
            f"Practical guide for {original_query}"
        ]

In [None]:
def show_intent_options_clean(chatbot_history, state, options):
    '''
    Display intent clarification options to user in chatbot interface.
    Formats disambiguation options with numbered choices, adds "Other" option, appends to chat history.
    '''
    options_text = "🎯 **Please clarify your intent:**\n\n"
    options_text += "Select the option that best matches what you're looking for:\n\n"

    for i, option in enumerate(options, 1):
        options_text += f"**Option {i}:** {option}\n\n"

    options_text += "**Option 4:** 🔧 **Other** - Specify your own topic/domain\n\n"
    options_text += "👇 **Click the corresponding Option button below to proceed.**"

    # ADD to existing history, don't replace
    intent_message = {"role": "assistant", "content": options_text}
    updated_history = chatbot_history + [intent_message]

    return updated_history, state

In [None]:
def generate_answer_type_options(query, selected_topic_intent):
    '''
    Generate 3 answer-type options based on query and selected topic to clarify desired response format.
    Determines appropriate response styles (informative, practical tips, basic overview, etc.) with fallback logic.
    '''
    prompt = f"""Query: "{query}"
        Selected Topic: "{selected_topic_intent}"

        The user wants to know about this topic. What type of answer would be most helpful?

        Generate 3 different ANSWER TYPE options that would be appropriate for this query:

        Consider these categories:
        - Informative (detailed explanation, facts, background information)
        - Practical Tips (actionable advice, how-to guidance, steps to follow)
        - Basic Overview (simple introduction, key points, beginner-friendly)
        - Expert Analysis (in-depth professional perspective, technical details)
        - Comparison/Evaluation (pros/cons, alternatives, recommendations)
        - Problem-Solving (solutions, troubleshooting, addressing specific issues)

        Format each option as: "Answer Type (brief description)"

        Examples:
        - Informative (comprehensive background and facts)
        - Practical Tips (step-by-step actionable guidance)
        - Basic Overview (simple introduction for beginners)

        Respond with exactly 3 lines, no explanations:"""

    try:
        response = get_completion(prompt)

        # Parse response similar to intent options
        lines = [line.strip() for line in response.strip().split('\n') if line.strip()]

        options = []
        skip_phrases = [
            "here are", "results", "the query", "could refer", "examples",
            "format", "write only", "respond with", "provide", "options",
            "different types", "answer type"
        ]

        for line in lines:
            line_lower = line.lower()
            if any(phrase in line_lower for phrase in skip_phrases):
                continue

            # Remove numbering/bullets
            import re
            cleaned_line = re.sub(r'^[0-9]+[\.\)\-\s]+', '', line)
            cleaned_line = re.sub(r'^[\-\*\•]\s+', '', cleaned_line)

            if cleaned_line and len(cleaned_line) > 3:
                options.append(cleaned_line)

        # Take first 3 options
        if len(options) >= 3:
            return options[:3]

        # Fallback options based on query analysis
        query_lower = query.lower()

        if any(word in query_lower for word in ['how to', 'steps', 'guide', 'tutorial']):
            return [
                "Practical Tips (step-by-step actionable guidance)",
                "Informative (detailed explanation and background)",
                "Basic Overview (simple introduction for beginners)"
            ]
        elif any(word in query_lower for word in ['what is', 'explain', 'about']):
            return [
                "Informative (comprehensive background and facts)",
                "Basic Overview (simple introduction for beginners)",
                "Expert Analysis (in-depth professional perspective)"
            ]
        else:
            # Generic fallback
            return [
                "Informative (comprehensive background and facts)",
                "Practical Tips (actionable advice and guidance)",
                "Basic Overview (simple introduction and key points)"
            ]

    except Exception as e:
        print(f"Error generating answer type options: {e}")
        return [
            "Informative (comprehensive background and facts)",
            "Practical Tips (actionable advice and guidance)",
            "Basic Overview (simple introduction and key points)"
        ]

In [None]:
def show_answer_type_options_clean(chatbot_history, state, options):
    '''
    Display answer-type selection options to user while preserving chat history.
    Formats response format choices with numbered options, adds "Other" option, appends to existing conversation.
    '''
    options_text = "📝 **What type of answer do you need?**\n\n"
    options_text += "Choose the response format that best fits your needs:\n\n"

    for i, option in enumerate(options, 1):
        options_text += f"**Option {i}:** {option}\n\n"

    options_text += "**Option 4:** 🔧 **Other** - Specify your preferred answer format\n\n"
    options_text += "👇 **Click the corresponding Option button below.**"

    # ADD to existing history, preserving user query and previous messages
    answer_type_message = {"role": "assistant", "content": options_text}
    updated_history = chatbot_history + [answer_type_message]

    return updated_history, state

## Intelligent Routing System

### Multi-Tier Routing Architecture

**Routing Strategy Patterns:**
1. **Hierarchical Decision Making**: Dictionary → Partial matching → LLM fallback
2. **Comprehensive Tracking**: Every routing decision logged
3. **Fallback Resilience**: Multiple layers ensure processing never fails
4. **Performance Optimization**: Fast dictionary lookup before LLM calls

In [None]:
def intelligent_routing_agent(original_query, selected_answer_type, rewritten_query=None):
    '''
    Analyze query intent using LLM when dictionary-based routing fails.
    Determines optimal synthesis method (decision_making, solution_focused, or informational) based on query analysis.
    '''

    if selected_answer_type is None:
        print("WARNING: selected_answer_type is None, using LLM to analyze query intent")
        analysis_target = original_query
        answer_type_description = "Unknown - need to analyze query intent"
    else:
        analysis_target = f"Query: {original_query}\\nAnswer Type Requested: {selected_answer_type}"
        answer_type_description = selected_answer_type

    routing_prompt = f"""You are an intelligent routing agent for a collaborative analysis system. Your job is to analyze user queries and determine the best synthesis approach.

            AVAILABLE SYNTHESIS METHODS:
            1. **DECISION_MAKING**: Use when the user wants evaluation, comparison, recommendations, advice, or needs to make a choice between options.

            2. **SOLUTION_FOCUSED**: Use when the user wants practical tips, step-by-step solutions, implementation guidance, troubleshooting, or actionable advice.

            3. **INFORMATIONAL**: Use when the user wants facts, explanations, overviews, background information, or educational content.

            ANALYSIS TARGET:
            {analysis_target}

            RESPOND WITH ONLY ONE WORD: "DECISION_MAKING", "SOLUTION_FOCUSED", or "INFORMATIONAL"

            Think about what the user really wants as an outcome from their query."""

    try:
        messages = [{"role": "user", "content": routing_prompt}]
        response = client.chat.completions.create(
            model="llama-3-8b-instruct",
            messages=messages,
            temperature=0
        )

        routing_decision = response.choices[0].message.content.strip().upper()

        valid_methods = ["DECISION_MAKING", "SOLUTION_FOCUSED", "INFORMATIONAL"]
        if routing_decision in valid_methods:
            method = routing_decision.lower()
            print(f"🤖 INTELLIGENT ROUTING: LLM determined '{method}' based on query analysis")
            return method
        else:
            print(f"⚠️ LLM returned invalid method '{routing_decision}', using fallback analysis")
            return fallback_intelligent_analysis(original_query, selected_answer_type)

    except Exception as e:
        print(f"ERROR in intelligent routing agent: {e}")
        return fallback_intelligent_analysis(original_query, selected_answer_type)

In [None]:
def fallback_intelligent_analysis(original_query, selected_answer_type):
    '''
    Backup routing mechanism using keyword pattern matching when LLM-based routing fails.
    Scores query against decision-making and solution-focused keyword lists, returns highest scoring category.
    '''
    query_lower = original_query.lower()
    answer_type_lower = (selected_answer_type or "").lower()
    combined_text = f"{query_lower} {answer_type_lower}"

    # Decision-making indicators
    decision_keywords = [
        "should i", "which", "better", "best", "recommend", "choose", "decide",
        "compare", "versus", "vs", "pros and cons", "advantages", "disadvantages",
        "worth it", "advice", "suggest", "opinion", "prefer", "evaluation"
    ]

    # Solution-focused indicators
    solution_keywords = [
        "how to", "steps", "guide", "tutorial", "solve", "fix", "implement",
        "create", "build", "make", "do", "achieve", "accomplish", "tips",
        "practical", "actionable", "process", "method", "technique"
    ]

    decision_score = sum(1 for keyword in decision_keywords if keyword in combined_text)
    solution_score = sum(1 for keyword in solution_keywords if keyword in combined_text)

    if decision_score > solution_score and decision_score > 0:
        print(f"🔍 FALLBACK ANALYSIS: Decision-making intent detected (score: {decision_score})")
        return "DECISION_MAKING"
    elif solution_score > 0:
        print(f"🔍 FALLBACK ANALYSIS: Solution-focused intent detected (score: {solution_score})")
        return "SOLUTION_FOCUSED"
    else:
        print(f"🔍 FALLBACK ANALYSIS: Informational intent (default)")
        return "INFORMATIONAL"

In [None]:
def process_selected_answer_type_intelligent_dynamic(selected_answer_type, selected_topic_intent, original_query, query_id, chatbot_history, state):
    '''
    Process selected answer type using intelligent routing and dynamic UI management.
    Handles CollabSearch pipeline execution, cleans chat history, manages processing state, and provides error recovery.
    '''
    try:
        # Update state with processing info
        if isinstance(state, dict):
            state["processing_query_id"] = query_id
            state["selected_topic_intent"] = selected_topic_intent
            state["processing"] = True
            state["show_options"] = False

        # Process with intelligent routing
        answer = add_predictions_sequential_intelligent(
            original_query,
            selected_topic_intent,
            selected_answer_type,
            query_id,
            state
        )

        # Remove the last two messages (custom selection + processing messages)
        cleaned_history = chatbot_history[:-2] if len(chatbot_history) >= 2 else []

        # Add only the final answer
        final_message = {"role": "assistant", "content": str(answer)}
        final_history = cleaned_history + [final_message]

        # Reset processing state
        state["processing"] = False
        state["show_options"] = False

        yield final_history, state, "", gr.update(visible=False), gr.update(visible=False)

    except Exception as e:
        error_msg = f"❌ **Error processing selected answer type:** {str(e)}"
        error_message = {"role": "assistant", "content": error_msg}

        # Clean up confirmation messages and show error
        cleaned_history = chatbot_history[:-2] if len(chatbot_history) >= 2 else chatbot_history
        error_history = cleaned_history + [error_message]

        # Reset state on error
        state["processing"] = False
        state["show_options"] = False

        yield error_history, state, "", gr.update(visible=False), gr.update(visible=False)

In [None]:
def route_synthesis_by_answer_type_intelligent(selected_answer_type, original_query=None, rewritten_query=None):
    '''
    Wrapper function for synthesis routing with backward compatibility.
    Calls comprehensive tracking function but returns only synthesis method to maintain existing code compatibility.
    '''
    # Call the new comprehensive tracking function
    synthesis_method, routing_info = route_synthesis_with_comprehensive_tracking(
        selected_answer_type, original_query, rewritten_query
    )
    return synthesis_method

In [None]:
def route_synthesis_with_comprehensive_tracking(selected_answer_type, original_query=None, rewritten_query=None, query_id=None):
    '''
    Enhanced synthesis routing with detailed decision-process tracking and fallback mechanisms.
    Attempts dictionary matching (exact/partial) for answer types, falls back to LLM analysis, returns method and routing metadata.
    '''
    # Initialize tracking info
    routing_info = {
        'routing_type': 'UNKNOWN',
        'routing_method': 'fallback',
        'dictionary_match': '',
        'match_found': False,
        'answer_type_processed': selected_answer_type or 'None'
    }

    if not selected_answer_type or len(selected_answer_type.strip()) == 0:
        print("⚠️ No answer type provided - routing to LLM analysis")
        routing_info.update({
            'routing_type': 'LLM',
            'routing_method': 'no_answer_type_provided',
            'match_found': False
        })
        result = intelligent_routing_agent(original_query, selected_answer_type, rewritten_query)
        return result, routing_info

    # Extract main answer type
    answer_type_main = selected_answer_type.split('(')[0].strip().lower()
    print(f"🔍 Processing answer type: '{answer_type_main}'")

    try:
        # Comprehensive routing dictionary
        synthesis_routing = {
            # Decision-making patterns
            'comparison': 'DECISION_MAKING',
            'recommendation': 'DECISION_MAKING',
            'advice': 'DECISION_MAKING',
            'evaluation': 'DECISION_MAKING',
            'assessment': 'DECISION_MAKING',
            'pros and cons': 'DECISION_MAKING',
            'which is better': 'DECISION_MAKING',
            'should i': 'DECISION_MAKING',
            'best option': 'DECISION_MAKING',
            'choose': 'DECISION_MAKING',
            'decision': 'DECISION_MAKING',
            'versus': 'DECISION_MAKING',
            'vs': 'DECISION_MAKING',

            # Solution-focused patterns
            'practical tips': 'SOLUTION_FOCUSED',
            'problem-solving': 'SOLUTION_FOCUSED',
            'how to': 'SOLUTION_FOCUSED',
            'step by step': 'SOLUTION_FOCUSED',
            'step-by-step tutorial': 'SOLUTION_FOCUSED',
            'guide': 'SOLUTION_FOCUSED',
            'tutorial': 'SOLUTION_FOCUSED',
            'implementation': 'SOLUTION_FOCUSED',
            'troubleshooting': 'SOLUTION_FOCUSED',
            'fix': 'SOLUTION_FOCUSED',
            'solve': 'SOLUTION_FOCUSED',
            'create': 'SOLUTION_FOCUSED',
            'build': 'SOLUTION_FOCUSED',
            'actionable': 'SOLUTION_FOCUSED',

            # Informational patterns
            'expert analysis': 'INFORMATIONAL',
            'informative': 'INFORMATIONAL',
            'basic overview': 'INFORMATIONAL',
            'explanation': 'INFORMATIONAL',
            'background': 'INFORMATIONAL',
            'what is': 'INFORMATIONAL',
            'overview': 'INFORMATIONAL',
            'summary': 'INFORMATIONAL',
            'details': 'INFORMATIONAL',
            'information': 'INFORMATIONAL',
            'facts': 'INFORMATIONAL',
            'learn': 'INFORMATIONAL',
            'understand': 'INFORMATIONAL'
        }

        # Exact match check
        if answer_type_main in synthesis_routing:
            method = synthesis_routing[answer_type_main]
            print(f"✅ EXACT DICTIONARY MATCH: '{answer_type_main}' → '{method}'")
            routing_info.update({
                'routing_type': 'DICTIONARY',
                'routing_method': 'exact_match',
                'dictionary_match': answer_type_main,
                'match_found': True
            })
            return method.lower(), routing_info

        # Partial match check
        for key, method in synthesis_routing.items():
            if key in answer_type_main or answer_type_main in key:
                print(f"✅ PARTIAL DICTIONARY MATCH: '{answer_type_main}' ↔ '{key}' → '{method}'")
                routing_info.update({
                    'routing_type': 'DICTIONARY',
                    'routing_method': 'partial_match',
                    'dictionary_match': key,
                    'match_found': True
                })
                return method.lower(), routing_info

        # No dictionary match found
        print(f"❌ NO DICTIONARY MATCH for '{answer_type_main}'")
        print("🤖 Routing to LLM for intelligent analysis...")

    except Exception as e:
        print(f"❌ Dictionary lookup error: {e}")

    # LLM routing fallback
    routing_info.update({
        'routing_type': 'LLM',
        'routing_method': 'dictionary_fallback',
        'match_found': False
    })

    result = intelligent_routing_agent(original_query, selected_answer_type, rewritten_query)
    return result, routing_info


# REW

In [None]:
def rewrite_query_with_dual_intent(original_query, selected_topic, selected_answer_type, state):
    '''
    Rewrite query and store in state
    '''
    answer_type_main = selected_answer_type.split('(')[0].strip()
    answer_type_description = selected_answer_type.split('(')[1].strip(')') if '(' in selected_answer_type else ""

    instruction = f"""You have an original user query, their selected topic/domain, and their desired answer type.

    Your task: Rewrite the query to be more specific and actionable based on these clarifications.

    Original query: "{original_query}"
    Selected topic/domain: "{selected_topic}"
    Desired answer type: "{answer_type_main}"
    Answer type context: "{answer_type_description}"

    Guidelines:
    1. Keep the core intent of the original query
    2. Make it more specific to the selected topic, as the user has already specified to which term they refer to. DO NOT CHANGE TOPICS, selected topic: {selected_topic}.
    3. Frame it to expect the desired answer type
    4. Make it actionable and clear
    5. Don't change the fundamental question, remain the query in the specified selected topic ({selected_topic})

    Return only the rewritten query, nothing else."""

    working_query = get_completion(instruction)

    # Clean up the response
    working_query = working_query.strip()
    if working_query.startswith('"') and working_query.endswith('"'):
        working_query = working_query[1:-1]

    # Store in state for later use
    state["working_query"] = working_query

    return working_query

## Multi-Agent Analysis System

### Dynamic Role Assignment and Expert Analysis

The system employs three distinct expert perspectives:
1. **Local Analysis**: Domain-specific expert with deep topical knowledge
2. **Expert Analysis**: Subject matter expert with authoritative perspective
3. **User Analysis**: General analyst focusing on practical user needs

In [None]:
def get_roles(query, topic):
  '''
  Generate 3 expert roles dynamically based on clarified topic for multi-perspective analysis.
  Uses LLM to create complementary specialist roles within the topic domain, includes retry logic and fallback roles.
  '''
    max_retries = 100
    prompt_roles = f"""You are a Recruiting Manager. The user has clarified they want to know about: "{topic}"

                        IMPORTANT CONSTRAINTS:
                        - Focus ONLY on the clarified topic: "{topic}"
                        - IGNORE any ambiguous terms from the original query
                        - All 3 expert roles must be specialists within the scope of: "{topic}"
                        - DO NOT go outside this specific domain

                        Provide 3 different expert roles who specialize in "{topic}" and can analyze the question from different perspectives within this domain.

                        The 3 roles should have complementary expertise areas within "{topic}".

                        Return ONLY a Python list in this exact format:
                        ["{topic}", "expert role 1", "expert role 2", "expert role 3"]

                        No explanations, no other text, just the list."""

    for i in range(max_retries):
        try:
            messages = [{"role": "user", "content": prompt_roles}]
            response = client.chat.completions.create(
                model="llama-3-8b-instruct",
                messages=messages,
                temperature=0
            )

            response_text = response.choices[0].message.content.strip()

            if response_text.startswith('[') and response_text.endswith(']'):
                definition_list = eval(response_text)  # Convert string to list
                if isinstance(definition_list, list) and len(definition_list) >= 4:
                    return definition_list

            raise ValueError("Invalid response format")

        except Exception as e:
            if i < max_retries - 1:
                time.sleep(2)
            else:
                # Fallback
                return [
                    "General Knowledge",
                    "Subject Matter Expert",
                    "Technical Specialist",
                    "End User Analyst"
                ]

In [None]:
def define_roles(definition_list, state):
    """
    Define roles using state
    """
    state["target_role_map"] = {
        "Local": definition_list[1],
        "Expert": definition_list[2],
        "User Analysis": definition_list[3]
    }

In [None]:
def local_analysis_enhanced(query, topic, answer_type, state=None):
    '''
    Generate enhanced local analysis response using dynamically assigned expert role and answer type formatting.
    Extracts role from state mapping, formats instructions with answer type requirements, calls LLM with role-based prompt.
    '''
    role = state["target_role_map"].get("Local", "Expert Analyst")

    # Extract the main answer type from the selection
    answer_type_main = answer_type.split('(')[0].strip()

    instruction = f"""You are a {role} with deep knowledge about {topic}.

    User Query: "{query}"
    Requested Answer Type: {answer_type}

    As a {role}, provide your professional analysis addressing this query.

    IMPORTANT: Format your response as {answer_type_main.upper()}:

    {get_answer_type_instructions(answer_type_main)}

    Your response should reflect the expertise and viewpoint that defines your role as a {role} while following the requested answer format."""

    return get_completion_with_role(role, instruction, query)

def expert_analysis_enhanced(query, topic, answer_type, state=None):
    '''
    Generate enhanced expert analysis response with specialized role assignment and answer type consideration.
    Uses subject matter expert role from state, incorporates answer type instructions, provides authoritative domain analysis.
    '''
    role = state["target_role_map"].get("Expert", "Subject Matter Expert")

    answer_type_main = answer_type.split('(')[0].strip()

    instruction = f"""You are a {role} specializing in {topic}.

    User Query: "{query}"
    Requested Answer Type: {answer_type}

    Provide your professional expert analysis of this query.

    IMPORTANT: Format your response as {answer_type_main.upper()}:

    {get_answer_type_instructions(answer_type_main)}

    Your analysis should reflect the authority and comprehensive understanding that comes from being a recognized {role} in this domain."""

    return get_completion_with_role(role, instruction, query)

def user_analysis_enhanced(query, topic, answer_type, state=None):
    '''
    Generate enhanced user-perspective analysis with complementary expert viewpoint and answer type formatting.
    Applies general analyst role from state mapping, formats response according to requested answer type, provides distinct analytical value.
    '''
    role = state["target_role_map"].get("User Analysis", "General Analyst")
    answer_type_main = answer_type.split('(')[0].strip()

    instruction = f"""You are a {role} with expertise in {topic}.

    User Query: "{query}"
    Requested Answer Type: {answer_type}

    Analyze this query from your specialized perspective as a {role}.

    IMPORTANT: Format your response as {answer_type_main.upper()}:

    {get_answer_type_instructions(answer_type_main)}

    Your analysis should complement other expert perspectives while offering the distinct value that only a {role} can provide."""

    return get_completion_with_role(role, instruction, query)

In [None]:
def get_answer_type_instructions(answer_type):
    '''
    Get specific instructions based on answer type
    '''
    instructions = {
        "Informative": """
        - Provide comprehensive background information and facts
        - Include detailed explanations and context
        - Cover multiple aspects of the topic
        - Use evidence and examples to support points
        - Structure information clearly and logically""",

        "Practical Tips": """
        - Focus on actionable advice and guidance
        - Provide step-by-step instructions where applicable
        - Include specific recommendations and best practices
        - Emphasize what the user can actually do
        - Make suggestions concrete and implementable""",

        "Basic Overview": """
        - Keep explanations simple and accessible
        - Focus on key points and essential information
        - Avoid technical jargon or complex details
        - Provide a clear, easy-to-understand introduction
        - Structure information in a beginner-friendly way""",

        "Expert Analysis": """
        - Provide in-depth professional perspective
        - Include technical details and advanced insights
        - Reference industry standards and best practices
        - Demonstrate specialized knowledge and expertise
        - Address complex aspects and nuances""",

        "Comparison": """
        - Present pros and cons clearly
        - Compare different options or approaches
        - Provide balanced evaluation of alternatives
        - Include recommendations based on comparison
        - Help user understand trade-offs""",

        "Problem-Solving": """
        - Focus on solutions and troubleshooting
        - Address specific issues and challenges
        - Provide practical problem-solving approaches
        - Include preventive measures where applicable
        - Emphasize resolution strategies"""
    }

    return instructions.get(answer_type, instructions["Informative"])

In [None]:
def stance_analysis_enhanced(query, ling_response, expert_response, user_response, topic, stance, answer_type, state=None):
    '''
    Enhanced stance analysis that considers answer type
    '''
    role_1 = state["target_role_map"].get("Local", "Expert Analyst")
    role_2 = state["target_role_map"].get("Expert", "Subject Matter Expert")
    role_3 = state["target_role_map"].get("User Analysis", "General Analyst")

    stance_context = {
        "positive": "highly beneficial, well-founded, and strongly recommended",
        "negative": "problematic, risky, or not advisable"
    }

    stance_description = stance_context.get(stance, stance)
    answer_type_main = answer_type.split('(')[0].strip()

    prompt = f"""You are conducting stance detection analysis for collaborative decision-making.

    Original Query: '''{query}'''
    Topic: {topic}
    Requested Answer Type: {answer_type}

    EXPERT ANALYSES:
    From {role_1}: <<<{ling_response}>>>
    From {role_2}: [[[{expert_response}]]]
    From {role_3}: ---{user_response}---

    YOUR STANCE: You believe the approaches, recommendations, or solutions presented in response to this query are {stance_description} for the user's situation regarding {topic}.

    IMPORTANT: Your argument should be formatted as {answer_type_main.upper()} since that's what the user requested.

    {get_answer_type_instructions(answer_type_main)}

    TASK:
    1. **Analyze all three expert perspectives** through your {stance} lens
    2. **Extract supporting evidence** that supports your {stance} position
    3. **Build your argument** using evidence while following the {answer_type_main} format

    Present your {stance} argument with specific evidence from the expert analyses, formatted according to the user's requested answer type."""

    return get_completion(prompt)

In [None]:
def final_judgement(query, favor_response, against_response, topic):
    '''
    Enhanced final judgement that synthesizes collaborative analysis
    '''
    prompt = f"""You are the final decision-maker in a collaborative analysis system. Your role is to synthesize multiple expert perspectives and opposing viewpoints to provide the best possible response to the user.

    USER QUERY: "{query}"
    TOPIC AREA: {topic}

    COLLABORATIVE ANALYSIS RESULTS:

    POSITIVE PERSPECTIVE (Supporting Arguments):
    {favor_response}

    NEGATIVE PERSPECTIVE (Cautionary Arguments):
    {against_response}

    YOUR TASK:
    Synthesize these collaborative analyses to provide the optimal response to the user's query. This means:

    1. **Evaluate evidence quality**: Assess the strength and credibility of arguments from both sides
    2. **Consider user context**: Focus on what would be most beneficial for someone asking this specific query
    3. **Balance perspectives**: Integrate the strongest insights from both positive and negative analyses
    4. **Provide actionable guidance**: Give the user clear, practical direction

    OUTPUT REQUIREMENTS:
    - Deliver a comprehensive yet concise response
    - Be definitive while acknowledging important considerations
    - Focus on practical value for the user
    - Integrate insights from the collaborative analysis
    - Present as the authoritative answer to their query
    - Give a brief, practical response (1-2 paragraphs).

    Your response should represent the best collective wisdom from the collaborative analysis process."""

    judgement = get_completion(prompt)
    return judgement

In [None]:
def final_judgement_enhanced(query, favor_response, against_response, topic, answer_type):
    '''
    Enhanced final judgement that considers answer type
    '''
    answer_type_main = answer_type.split('(')[0].strip()

    prompt = f"""You are the final decision-maker in a collaborative analysis system. Your role is to synthesize multiple expert perspectives and opposing viewpoints to provide the best possible response to the user.

    USER QUERY: "{query}"
    TOPIC AREA: {topic}
    REQUESTED ANSWER TYPE: {answer_type}

    COLLABORATIVE ANALYSIS RESULTS:

    POSITIVE PERSPECTIVE (Supporting Arguments):
    {favor_response}

    NEGATIVE PERSPECTIVE (Cautionary Arguments):
    {against_response}

    YOUR TASK:
    Synthesize these collaborative analyses to provide the optimal response to the user's query.

    CRITICAL: Format your response as {answer_type_main.upper()} as specifically requested by the user:

    {get_answer_type_instructions(answer_type_main)}

    OUTPUT REQUIREMENTS:
    - Follow the {answer_type_main} format strictly
    - Integrate insights from the collaborative analysis
    - Focus on practical value for the user
    - Be definitive while acknowledging important considerations
    - Present as the authoritative answer to their query

    Your response should represent the best collective wisdom from the collaborative analysis process, delivered in exactly the format the user requested."""

    judgement = get_completion(prompt)
    return judgement


In [None]:
def summary_synthesis(working_query, ling_response, expert_response, user_response, topic, selected_answer_type, state):
    '''
    Synthesize multiple expert analyses into comprehensive factual response for informational queries.
    Extracts consensus and differing viewpoints, maintains objectivity, formats according to requested answer type without forcing recommendations.
    '''
    role_1 = state["target_role_map"].get("Local", "Expert Analyst")
    role_2 = state["target_role_map"].get("Expert", "Subject Matter Expert")
    role_3 = state["target_role_map"].get("User Analysis", "General Analyst")
    answer_type_main = selected_answer_type.split('(')[0].strip()

    prompt = f"""You are a Summary Synthesis Agent. Your role is to extract and synthesize the most relevant and important information from multiple expert analyses.

    USER QUERY: "{working_query}"
    TOPIC: {topic}
    REQUESTED ANSWER TYPE: {selected_answer_type}

    EXPERT ANALYSES:
    From {role_1}: <<<{ling_response}>>>
    From {role_2}: [[[{expert_response}]]]
    From {role_3}: ---{user_response}---

    YOUR TASK:
    Synthesize these expert perspectives into a comprehensive, factual response that directly answers the user's query.

    IMPORTANT: Format your response as {answer_type_main.upper()}:

    {get_answer_type_instructions(answer_type_main)}

    SYNTHESIS GUIDELINES:
    1. **Extract key facts**: Pull the most important information from all three experts
    2. **Identify consensus**: Where experts agree, present this as reliable information
    3. **Note different perspectives**: Where experts offer different viewpoints, present both
    4. **Stay factual**: Focus on information, not recommendations
    5. **Be comprehensive**: Cover all important aspects mentioned by the experts
    6. **Maintain objectivity**: Present information neutrally without bias
    7. **Structure clearly**: Organize information logically for easy understanding

    DO NOT:
    - Force recommendations when the user wants information
    - Create artificial pros/cons lists for factual queries
    - Add opinions where experts provided facts
    - Turn factual content into advice

    Provide a well-structured synthesis that gives the user exactly the type of information they requested."""

    return get_completion(prompt)

In [None]:
def solution_synthesis(working_query, ling_response, expert_response, user_response, topic, selected_answer_type, state):
    '''
    Synthesize multiple expert analyses into practical, actionable solutions for solution-focused queries.
    Extracts actionable advice from expert perspectives, prioritizes effective approaches, provides clear implementation guidance.
    '''
    role_1 = state["target_role_map"].get("Local")
    role_2 = state["target_role_map"].get("Expert")
    role_3 = state["target_role_map"].get("User Analysis")
    answer_type_main = selected_answer_type.split('(')[0].strip()

    prompt = f"""You are a Solution Synthesis Agent. Your role is to synthesize expert analyses into practical, actionable solutions.

    USER QUERY: "{working_query}"
    TOPIC: {topic}
    REQUESTED ANSWER TYPE: {selected_answer_type}

    EXPERT ANALYSES:
    From {role_1}: <<<{ling_response}>>>
    From {role_2}: [[[{expert_response}]]]
    From {role_3}: ---{user_response}---

    YOUR TASK:
    Synthesize these expert perspectives into a practical, solution-focused response.

    IMPORTANT: Format your response as {answer_type_main.upper()}:

    {get_answer_type_instructions(answer_type_main)}

    SOLUTION GUIDELINES:
    1. **Identify the core need**: What is the user trying to accomplish?
    2. **Extract actionable advice**: Pull practical steps and recommendations from experts
    3. **Prioritize solutions**: Present the most effective approaches first
    4. **Consider implementation**: Include practical considerations for execution
    5. **Address potential challenges**: Note important limitations or considerations
    6. **Provide clear guidance**: Make recommendations specific and actionable
    7. **Focus on outcomes**: Help user understand what success looks like

    Focus on giving the user a clear path forward based on the expert analyses."""

    return get_completion(prompt)

Test LLM Connection

In [None]:
# Test your private LLM
def test_private_llm():
    response = client.chat.completions.create(
        model="llama-3-8b-instruct",
        messages=[
            {"role": "user", "content": "Explain how AI works in a few words"}
        ]
    )
    print("Private LLM Response:", response.choices[0].message.content)

test_private_llm()

Private LLM Response: "Machine learns patterns in data, mimics human thinking"


## RAG System Architecture

### Enhanced RAG Processing Pipeline

**RAG Architecture Insights:**
1. **Search Integration**: Uses Google Search API for current information
2. **Source Attribution**: Preserves and displays source URLs
3. **Graceful Fallback**: Handles empty search results
4. **Structured Data Processing**: Extracts titles and snippets

In [None]:
'''
Initialize Google Search API wrapper for web search functionality.
Creates search tool object for retrieving current web information and sources.
'''
search_tool = GoogleSearchAPIWrapper()

In [None]:
def simple_rag_with_private_llm(query):
    '''
    Perform RAG (Retrieval-Augmented Generation) using web search to validate and enhance expert recommendations.
    Searches Google for current information, uses LLM to analyze results against original recommendation, returns enhanced response with source URLs.
    '''
    print(f"RAG processing query: {query[:100]}...")

    try:
        # Extract simple search terms from the query
        search_query = query #test
        print(f"Extracted search query: {search_query}")

        # Perform web search using structured results
        raw_results = search_tool.results(search_query, num_results=5)

        # Extract URLs from structured search results
        source_urls = [r['link'] for r in raw_results if 'link' in r]
        print(f"Extracted {len(source_urls)} source URLs:")
        for url in source_urls:
            print(f"  - {url}")

        # Generate a readable text summary for the LLM
        search_results = "\n".join(
        [f"{r['title']}: {r['snippet']}" for r in raw_results if 'title' in r and 'snippet' in r]
        )

        print(f"Search results preview:\n{search_results[:300]}...")

        # Check if search was successful
        if not search_results.strip():
            print("No substantial search results found.")
            return "Based on current information, the original expert recommendation remains valid.\n\n---\n\n📚 **Note:** Web search was performed but no relevant results were found."

        # Enhanced RAG prompt
        rag_prompt = f"""Based on the search results, enhance and validate the expert recommendation.

        ORIGINAL EXPERT RECOMMENDATION: {query}

        CURRENT SEARCH RESULTS:
        {search_results}

        TASK: Use these search results to validate, update, and enhance the expert recommendation. Focus on:
        - Current accuracy of the information
        - Recent developments or changes
        - Specific details that improve the recommendation
        - Any corrections needed based on current data

        Provide a clear, enhanced recommendation that incorporates the latest information.

        IMPORTANT: Provide the links you used to update the information."""

        response = client.chat.completions.create(
            model="llama-3-8b-instruct",
            messages=[
                {"role": "system", "content": "You are a research analyst who validates expert recommendations using current search results. Do not include URLs or sources in your response and do not mention this in your response I am aware you will not include them."},
                {"role": "user", "content": rag_prompt}
            ],
            temperature=0
        )

        enhanced_response = response.choices[0].message.content

        # Format response with improved source display
        final_response_with_sources = format_response_with_sources(enhanced_response, source_urls)

        return final_response_with_sources

    except Exception as e:
        print(f"RAG search error: {e}")
        import traceback
        traceback.print_exc()
        return f"Unable to retrieve current information for validation.\n\n---\n\n⚠️ **Error Details:** {str(e)}"


In [None]:
def execute_rag_update(chatbot_history, state):
    '''
    Execute RAG enhancement on previous analysis with comprehensive validation and error handling.
    Validates session state, retrieves current web information to update analysis, manages UI states, includes fallback mechanisms.
    '''
    try:
        print("🔄 RAG Enhancement: Starting state-based processing...")

        # ========================================
        # VALIDATION 1: Check if state exists and is properly structured
        # ========================================
        if not state or not isinstance(state, dict):
            print("❌ No valid state found")
            error_msg = "❌ **No active session found.** Please start a new query to use RAG enhancement."
            error_message = {"role": "assistant", "content": error_msg}
            yield chatbot_history + [error_message], state or {}, ""
            return

        # ========================================
        # VALIDATION 2: Check if user has submitted a query through COLA
        # ========================================
        original_query = state.get("original_query", "")
        working_query = state.get("working_query", "")
        processing_query_id = state.get("processing_query_id")

        # Enhanced validation for pre-query clicks
        if not original_query and not working_query and not processing_query_id:
            print("❌ User clicked RAG before submitting any query")
            error_msg = """❌ **No query to enhance yet!**

                        Please follow these steps:
                        1. 📝 **Submit your question** in the text box above
                        2. 🎯 **Select your preferred topic focus** (Option 1, 2, or 3)
                        3. 📋 **Choose your answer type** (Option 1, 2, or 3)
                        4. ⏳ **Wait for the analysis to complete**
                        5. 🔄 **Then click this button** to enhance with current information

                        *The RAG enhancement works best after you've received an initial analysis.*"""

            error_message = {"role": "assistant", "content": error_msg}
            yield chatbot_history + [error_message], state, ""
            return

        # ========================================
        # VALIDATION 3: Check if CollabSearch processing is complete
        # ========================================
        if original_query and not working_query:
            print("⚠️ Query exists but COLA processing may be incomplete")
            error_msg = """⚠️ **Analysis still in progress!**

                        Your query is being processed through the COLA framework. Please:
                        - 🎯 **Complete the topic selection** if prompted
                        - 📋 **Complete the answer type selection** if prompted
                        - ⏳ **Wait for the initial analysis to finish**

                        *RAG enhancement will be available once the analysis is complete.*"""

            error_message = {"role": "assistant", "content": error_msg}
            yield chatbot_history + [error_message], state, ""
            return

        # ========================================
        # VALIDATION 4: Check if there's content to enhance
        # ========================================
        if not chatbot_history or len(chatbot_history) == 0:
            print("❌ No chat history to enhance")
            error_msg = "❌ **No conversation history found.** Please submit a query first, then use RAG enhancement."
            error_message = {"role": "assistant", "content": error_msg}
            yield chatbot_history + [error_message], state, ""
            return

        # Get the most recent assistant response to enhance
        last_assistant_response = ""
        for msg in reversed(chatbot_history):
            if msg.get("role") == "assistant" and msg.get("content"):
                content = msg.get("content", "")
                # Skip RAG-related messages to get the actual analysis
                if not content.startswith("🔄") and not content.startswith("✅") and not content.startswith("❌"):
                    last_assistant_response = content
                    break

        if not last_assistant_response:
            print("❌ No assistant response found to enhance")
            error_msg = """❌ **No analysis found to enhance.**

                        Please ensure you have:
                        1. ✅ **Submitted a complete query**
                        2. ✅ **Received an analysis response**
                        3. ✅ **Completed the COLA framework process**

                        *Then try the RAG enhancement again.*"""

            error_message = {"role": "assistant", "content": error_msg}
            yield chatbot_history + [error_message], state, ""
            return

        # ========================================
        # SUCCESSFUL VALIDATION: Proceed with RAG
        # ========================================
        print(f"✅ Validation passed - enhancing query: '{original_query[:50]}...'")
        print(f"📝 Working query: '{working_query[:50]}...'")
        print(f"📊 Last response length: {len(last_assistant_response)} characters")

        # Show processing message with helpful context
        processing_msg = f"""🔄 **Enhancing your analysis with current information...**

                        **Your Query:** {original_query[:100]}{'...' if len(original_query) > 100 else ''}

                        🔍 Searching for the latest information to update and validate the analysis..."""

        processing_message = {"role": "assistant", "content": processing_msg}
        temp_history = chatbot_history + [processing_message]

        yield temp_history, state, ""

        # ========================================
        # RAG ENHANCEMENT: Use state data directly
        # ========================================
        print("🤖 Starting RAG enhancement with state data...")

        try:
            enhanced_response = enhanced_rag_with_session_state(
                original_query=original_query,
                working_query=working_query,
                previous_analysis=last_assistant_response,
                state=state
            )

            print("✅ RAG enhancement completed successfully")

            # Store RAG results in state for this session
            state["rag_enhanced_response"] = enhanced_response
            state["rag_timestamp"] = time.time()
            state["rag_original_query"] = original_query

        except Exception as rag_error:
            print(f"❌ RAG processing failed: {rag_error}")
            enhanced_response = f"""**RAG Enhancement Notice:**

                            The current information lookup encountered an issue: {str(rag_error)}

                            **Your original analysis remains valid and complete.** This enhancement failure doesn't affect the quality of the previous response.

                            *You can try the enhancement again or continue with the existing analysis.*"""

        # ========================================
        # PRESENT ENHANCED RESULTS
        # ========================================
        success_msg = f"""✅ **Analysis Enhanced with Current Information!**

                            {enhanced_response}

                            ---
                            *💡 This response combines your original analysis with the latest available information for accuracy and relevance.*"""

        final_message = {"role": "assistant", "content": success_msg}
        updated_history = chatbot_history + [final_message]

        # ========================================
        # OPTIONAL: Background database logging (non-blocking)
        # ========================================
        try:
            if processing_query_id:
                # This is just for logging - doesn't affect RAG functionality
                background_database_logging(processing_query_id, enhanced_response)
        except Exception as db_error:
            print(f"⚠️ Database logging failed (non-critical): {db_error}")
            # Don't show this error to user - it's just logging

        yield updated_history, state, ""

    except Exception as e:
        print(f"❌ Critical error in execute_rag_update: {e}")
        import traceback
        traceback.print_exc()

        # Comprehensive error message for users
        error_msg = f"""❌ **Enhancement Error**

                                An unexpected error occurred during RAG enhancement: `{str(e)}`

                                **Your original analysis is still available** in the conversation above. You can:
                                - 📋 **Continue using the existing analysis**
                                - 🔄 **Try enhancement again** in a few moments
                                - 💬 **Submit a new query** if needed

                                *This error has been logged for improvement.*"""

        error_message = {"role": "assistant", "content": error_msg}
        updated_history = chatbot_history + [error_message]
        yield updated_history, state or {}, ""

In [None]:
def get_last_query_data():
    '''
    Get the last query data from database
    '''
    try:
        if not os.path.exists('cola_database.csv'):
            return None, None, None, None

        df = pd.read_csv('cola_database.csv')
        if df.empty:
            return None, None, None, None

        # Debug: Print column names to see what we actually have
        print(f"DEBUG - Database columns: {list(df.columns)}")

        # Try completed queries first, fallback to any query
        completed_queries = df[df['Status'] == 'completed']
        if not completed_queries.empty:
            last_query = completed_queries.iloc[-1]
        else:
            last_query = df.iloc[-1]

        query_id = int(last_query['ID'])

        if 'Original_Query' in df.columns and 'Rewritten_Query' in df.columns:
            # New schema
            print("DEBUG - Using new schema (Original_Query, Rewritten_Query)")
            original_query = str(last_query['Original_Query']) if pd.notna(last_query['Original_Query']) else "Original query not available"
            rewritten_query = str(last_query['Rewritten_Query']) if pd.notna(last_query['Rewritten_Query']) else original_query
        elif 'Query' in df.columns:
            # Old schema - fallback to 'Query' column
            print("DEBUG - Using old schema (Query)")
            query_value = str(last_query['Query']) if pd.notna(last_query['Query']) else "Query not available"
            original_query = query_value
            rewritten_query = query_value  # Use same value for both
        else:
            print("DEBUG - No recognized query columns found")
            print(f"Available columns: {list(df.columns)}")
            return None, None, None, None

        # Handle different possible column names for final response
        if 'Final Judgement' in df.columns and pd.notna(last_query['Final Judgement']):
            final_response = str(last_query['Final Judgement'])
        elif 'Final_Judgement' in df.columns and pd.notna(last_query['Final_Judgement']):
            final_response = str(last_query['Final_Judgement'])
        else:
            final_response = rewritten_query

        print(f"DEBUG - Returning: query_id={query_id}, rewritten_query='{rewritten_query[:50]}...', original_query='{original_query[:50]}...'")
        return query_id, rewritten_query, final_response, original_query

    except Exception as e:
        print(f"Error retrieving last query data: {e}")
        import traceback
        traceback.print_exc()
        return None, None, None, None

In [None]:
def format_response_with_sources(enhanced_response, source_urls):
    '''
    Format the response with clear, visible source links
    '''
    if not source_urls:
        sources_section = "\n\n---\n\n📚 **Note:** Search was performed but specific source URLs could not be extracted. Information has been validated against current web content."
    else:
        sources_section = "\n\n---\n\n📚 **Sources Used for This Update:**\n\n"

        for i, url in enumerate(source_urls, 1):
            try:
                from urllib.parse import urlparse
                parsed = urlparse(url)
                domain = parsed.netloc if parsed.netloc else parsed.path
                if domain.startswith('www.'):
                    domain = domain[4:]

                # Format as clickable link
                sources_section += f"{i}. [{domain}]({url})\n"
            except:
                sources_section += f"{i}. {url}\n"

        sources_section += "\n*Click on the links above to visit the sources.*"

    return enhanced_response + sources_section

In [None]:
def enhanced_rag_with_session_state(original_query, working_query, previous_analysis, state):
    '''
    Enhance previous analysis with current web information using session state without database dependency.
    Uses working query for better search results, validates/updates previous analysis, stores sources in state.
    '''
    start_time = time.time()

    print(f"🎯 RAG Context:")
    print(f"   📝 Original: {original_query}")
    print(f"   🔄 Working: {working_query}")
    print(f"   📊 Previous analysis: {len(previous_analysis)} chars")

    try:
        # Use the working query (rewritten/contextualized) for better RAG results
        query_for_rag = working_query if working_query else original_query

        # Create RAG prompt that leverages the previous analysis
        rag_prompt = f"""Based on this answer:

                {previous_analysis[:1000]}...

                Please provide updated, current information that validates, corrects, or expands upon this answer for the query: "{query_for_rag}"

                Focus on:
                - Latest developments or changes
                - Current accuracy of the information
                - Recent data or statistics
                - Any new perspectives or considerations"""

        print("🔍 Calling RAG system...")
        enhanced_response = simple_rag_with_private_llm(rag_prompt)

        # Extract and log source information
        source_urls = extract_source_urls_from_response(enhanced_response)
        print(f"📚 Found {len(source_urls)} sources")

        # Store sources in state
        if source_urls:
            state["rag_sources"] = source_urls

        # Processing time
        processing_time = round(time.time() - start_time, 2)
        print(f"⏱️ RAG completed in {processing_time} seconds")

        return enhanced_response

    except Exception as e:
        print(f"❌ RAG processing error: {e}")
        raise e  # Re-raise to be handled by calling function

In [None]:
def extract_source_urls_from_response(response_text):
    '''
    Extract source URLs from RAG response text using regex patterns for transparency and source tracking.
    Finds URLs in various formats (parentheses, brackets, after "Source:"), removes duplicates, returns clean URL list.
    '''
    if not response_text or not isinstance(response_text, str):
        return []

    # Common patterns for URLs in RAG responses
    patterns = [
        r'\((https?://[^\)]+)\)',  # URLs in parentheses
        r'Source: (https?://\S+)',  # URLs after "Source:"
        r'\[(https?://[^\]]+)\]',   # URLs in brackets
        r'https?://\S+',            # Any standalone URLs
    ]

    urls = []
    for pattern in patterns:
        found_urls = re.findall(pattern, response_text)
        urls.extend(found_urls)

    # Remove duplicates and return
    return list(set(urls))

In [None]:
def background_database_logging(query_id, enhanced_response):
    '''
    Optional background logging to database
    This runs independently and doesn't affect RAG functionality
    '''
    try:
        import pandas as pd

        if not os.path.exists('cola_database.csv'):
            print("⚠️ Database file not found - skipping logging")
            return

        # Simple database update for logging
        df = pd.read_csv('cola_database.csv')
        mask = df['ID'] == query_id

        if mask.any():
            df.loc[mask, 'After RAG Agent'] = str(enhanced_response)[:1000]  # Truncate for storage
            df.loc[mask, 'RAG_Timestamp'] = time.time()
            df.to_csv('cola_database.csv', index=False)
            print(f"📊 Logged RAG results for query {query_id}")
        else:
            print(f"⚠️ Query {query_id} not found for logging")

    except Exception as e:
        print(f"⚠️ Database logging failed: {e}")
        # Don't raise - this is non-critical logging

#**CollabSearch** **MAIN**

In [None]:
def add_predictions_sequential_intelligent(original_query, selected_intent, selected_answer_type, query_id, state):
    '''
    Main CollabSearch framework orchestrator that processes queries through the complete multi-stage analysis pipeline.
    Coordinates query rewriting, routing, role assignment, expert analysis, synthesis, and database logging with duplicate processing prevention.
    '''
    start_time = time.time()

    print(f"\n🔒 PROCESSING: Query {query_id}")
    print(f"📝 Query: {original_query}")
    print(f"🎯 Intent: {selected_intent}")
    print(f"📄 Answer Type: {selected_answer_type}")
    print("="*70)

    # CRITICAL: Check if this query is already being processed
    if state.get("currently_processing") == query_id:
        print(f"⚠️ DUPLICATE PROCESSING PREVENTED for query {query_id}")
        return "Processing already in progress for this query."

    # Mark as currently processing
    state["currently_processing"] = query_id

    try:
        # Your existing processing logic here...
        # (I'll include the key parts)

        # STEP 1: Query rewriting
        working_query = rewrite_query_with_dual_intent(original_query, selected_intent, selected_answer_type, state)
        state["working_query"] = working_query
        print(f"✏️ Rewritten Query: {working_query}")

        # STEP 2: Routing
        synthesis_method, routing_info = route_synthesis_with_comprehensive_tracking(
            selected_answer_type=selected_answer_type,
            original_query=original_query,
            rewritten_query=working_query,
            query_id=query_id
        )
        print(f"🎯 Synthesis Method: {synthesis_method.upper()}")

        # STEP 3: Role assignment
        definition_list = get_roles(working_query, selected_intent)
        topic = definition_list[0]
        assigned_roles = definition_list[1:]
        state["topic"] = topic
        define_roles(definition_list, state)

        print(f"🏷️ Topic: {topic}")
        print(f"👥 Roles: {assigned_roles}")

        # STEP 4: Expert analysis
        ling_response = local_analysis_enhanced(working_query, topic, selected_answer_type, state)
        expert_response = expert_analysis_enhanced(working_query, topic, selected_answer_type, state)
        user_response = user_analysis_enhanced(working_query, topic, selected_answer_type, state)

        # STEP 5: Synthesis
        final_response = ""
        favor_response = ""
        against_response = ""

        print(f"⚙️ SYNTHESIS: {synthesis_method.upper()}")

        if synthesis_method == 'decision_making':
            favor_response = stance_analysis_enhanced(working_query, ling_response, expert_response, user_response, topic, "positive", selected_answer_type, state)
            against_response = stance_analysis_enhanced(working_query, ling_response, expert_response, user_response, topic, "negative", selected_answer_type, state)
            final_response = final_judgement_enhanced(working_query, favor_response, against_response, topic, selected_answer_type)
        elif synthesis_method == 'solution_focused':
            final_response = solution_synthesis(working_query, ling_response, expert_response, user_response, topic, selected_answer_type, state)
        else:  # informational
            final_response = summary_synthesis(working_query, ling_response, expert_response, user_response, topic, selected_answer_type, state)

        # STEP 6: Database update
        end_time = time.time()
        processing_time = round(end_time - start_time, 2)

        comprehensive_results = {
            'Original_Query': original_query,
            'Rewritten_Query': working_query,
            'Selected_Topic_Intent': selected_intent,
            'Selected_Answer_Type': selected_answer_type,
            'Routing_Type': routing_info['routing_type'],
            'Routing_Method': routing_info['routing_method'],
            'Dictionary_Match': routing_info.get('dictionary_match', ''),
            'Identified_Topic': topic,
            'Assigned_Roles': json.dumps(assigned_roles),
            'Linguist_Analysis': ling_response,
            'Expert_Analysis': expert_response,
            'User_Analysis': user_response,
            'In_Favor': favor_response,
            'Against': against_response,
            'Final_Judgement': final_response,
            'Synthesis_Method': synthesis_method,
            'Processing_Time_Seconds': processing_time,
        }

        update_comprehensive_results(query_id, comprehensive_results)

        print(f"✅ PROCESSING COMPLETE: {processing_time}s")
        print("="*70)

        return final_response

    except Exception as e:
        print(f"❌ PROCESSING ERROR: {e}")
        import traceback
        traceback.print_exc()
        return f"❌ **Error during analysis:** {str(e)}"

    finally:
        # CRITICAL: Clear processing lock
        if state.get("currently_processing") == query_id:
            state["currently_processing"] = None
            print(f"🔓 Processing lock cleared for query {query_id}")


#UI MAIN

In [None]:
def slow_echo_with_dual_intent_disambiguation_dynamic(message, chatbot_history, state):
    '''
    Main chatbot entry point that handles user queries with dual-intent disambiguation while preserving chat history.
    Resets processing state, generates unique query ID, initiates topic disambiguation, manages UI state transitions.
    '''

    # Initialize state if None
    if not state:
        state = {}

    # Reset only processing variables, preserve chat history
    processing_reset = {
        "query_id": None,
        "original_query": "",
        "intent_options": [],
        "step": "topic_selection",
        "processing_query_id": None,
        "answer_type_options": [],
        "selected_topic_intent": "",
        "working_query": "",
        "topic": "",
        "target_role_map": {},
        "selected_answer_type": "",
        "processing": False,
        "waiting_for_custom": False,
        "custom_input_type": "",
        "show_options": False,
        "show_main_input": True,
        "currently_processing": None
    }

    # Update state with reset values
    state.update(processing_reset)

    # Generate unique query ID
    import time
    current_id = int(time.time() * 1000000)

    # Set fresh state for this query
    state["query_id"] = current_id
    state["original_query"] = message
    state["step"] = "topic_selection"
    state["show_options"] = True
    state["processing"] = False

    # Preserve existing chat history properly
    if chatbot_history is None:
        chatbot_history = []

    # Add user message to existing history (don't replace it)
    user_message = {"role": "user", "content": message}
    updated_history = chatbot_history + [user_message]

    # Initial yield with clean UI state
    yield updated_history, state, "", gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)

    try:
        print(f"🚀 STARTING QUERY PROCESSING")
        print(f"📋 Query ID: {current_id}")
        print(f"📝 Query: {message}")
        print(f"📚 Existing chat history: {len(chatbot_history)} messages")

        # Add query to database
        add_new_query(current_id, message)

        # Generate topic intent options
        topic_intent_options = generate_intent_options(message)
        print(f"🎯 Generated options: {topic_intent_options}")

        # Store in state
        state["intent_options"] = topic_intent_options

        # Show intent options WITHOUT removing user message
        final_history, _ = show_intent_options_clean(updated_history, state, topic_intent_options)

        # Show options UI
        yield final_history, state, "", gr.update(visible=True), gr.update(visible=True), gr.update(visible=False)

    except Exception as e:
        error_message = f"❌ **Error in intent disambiguation:** {str(e)}"
        print(f"❌ Error: {e}")

        error_history = updated_history + [{"role": "assistant", "content": error_message}]
        state["show_options"] = False
        state["processing"] = False

        yield error_history, state, "", gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)

In [None]:
def view_database_stats():
    """View database statistics"""
    try:
        df = pd.read_csv('cola_database.csv')

        total_queries = len(df)
        completed = len(df[df['Status'] == 'completed'])
        pending = len(df[df['Status'] == 'pending'])
        errors = len(df[df['Status'] == 'error'])

        # Calculate average processing time for completed queries
        completed_df = df[df['Status'] == 'completed']
        if not completed_df.empty and 'Processing_Time_Seconds' in completed_df.columns:
            # Filter out NaN values before calculating mean
            processing_times = completed_df['Processing_Time_Seconds'].dropna()
            if not processing_times.empty:
                avg_time = processing_times.mean()
                avg_time_str = f"- Average processing time: {avg_time:.2f} seconds"
            else:
                avg_time_str = "- Average processing time: N/A"
        else:
            avg_time_str = "- Average processing time: N/A"

        stats = f"""
        Database Statistics:
        - Total queries: {total_queries}
        - Completed: {completed}
        - Pending: {pending}
        - Errors: {errors}
        {avg_time_str}

        Database Schema: {list(df.columns)}

        Recent queries:
        """

        if not df.empty:
            # Handle both old and new schema for display
            if 'Original_Query' in df.columns and 'Rewritten_Query' in df.columns:
                # New schema
                display_columns = ['ID', 'Original_Query', 'Rewritten_Query', 'Status', 'Processing_Time_Seconds', 'Timestamp']
            else:
                # Old schema
                display_columns = ['ID', 'Query', 'Status', 'Processing_Time_Seconds', 'Timestamp']

            # Only show columns that actually exist
            existing_columns = [col for col in display_columns if col in df.columns]
            recent = df.tail(5)[existing_columns]
            stats += recent.to_string(index=False)

        return stats
    except Exception as e:
        return f"Error reading database: {e}\n\nColumns found: {list(pd.read_csv('cola_database.csv').columns) if os.path.exists('cola_database.csv') else 'File not found'}"

## Option Handlers

In [None]:
def handle_option_1_click_enhanced_dynamic(chatbot_history, state):
    '''
    Handle Option 1 button clicks in two-step disambiguation process (topic selection then answer type selection).
    Manages state transitions, cleans chat history, executes CollabSearch framework processing, and controls UI element visibility.
    '''
    if not isinstance(state, dict):
        state = {"step": "topic_selection", "intent_options": [], "answer_type_options": [],
                "selected_topic_intent": "", "query_id": None, "original_query": "",
                "show_options": False, "processing": False, "show_main_input": True}

    if state.get("step") == "topic_selection" and state.get("intent_options") and len(state["intent_options"]) > 0:
        # STEP 1: Topic selection
        selected_topic = state["intent_options"][0]
        state["selected_topic_intent"] = selected_topic

        # Generate answer type options
        answer_type_options = generate_answer_type_options(state.get("original_query", ""), selected_topic)
        state["answer_type_options"] = answer_type_options
        state["step"] = "answer_type_selection"
        state["show_options"] = True

        # Remove only the intent options bubble, keep user query
        if chatbot_history and len(chatbot_history) >= 2:
            # Keep user query, remove intent options
            cleaned_history = chatbot_history[:-1]
        else:
            cleaned_history = chatbot_history

        # Add answer type options
        final_history, _ = show_answer_type_options_clean(cleaned_history, state, answer_type_options)

        yield final_history, state, gr.update(visible=True), gr.update(visible=True), gr.update(visible=False)

    elif state.get("step") == "answer_type_selection" and state.get("answer_type_options") and len(state["answer_type_options"]) > 0:
        # STEP 2: Answer type selection
        selected_answer_type = state["answer_type_options"][0]

        state["show_options"] = False
        state["processing"] = True

        # Remove only the answer type options, keep user query
        if chatbot_history and len(chatbot_history) >= 2:
            # Keep user query and any other messages, remove answer type options
            cleaned_history = chatbot_history[:-1]
        else:
            cleaned_history = chatbot_history

        # Show processing message
        processing_msg = f"🔄 **Processing your analysis...**\n\nAnalyzing '{state.get('original_query', '')}' using the COLA framework..."
        processing_message = {"role": "assistant", "content": processing_msg}
        processing_history = cleaned_history + [processing_message]

        # Hide buttons during processing
        yield processing_history, state, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)

        # Process query
        try:
            answer = add_predictions_sequential_intelligent(
                state.get("original_query", ""),
                state.get("selected_topic_intent", ""),
                selected_answer_type,
                state.get("query_id"),
                state
            )

            # REPLACE processing message with final answer
            final_message = {"role": "assistant", "content": str(answer)}
            final_history = cleaned_history + [final_message]

            state["processing"] = False
            state["show_options"] = False

            yield final_history, state, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)

        except Exception as e:
            error_msg = f"❌ **Error processing query:** {str(e)}"
            error_message = {"role": "assistant", "content": error_msg}
            error_history = cleaned_history + [error_message]

            state["processing"] = False
            state["show_options"] = False

            yield error_history, state, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)
    else:
        yield chatbot_history, state, gr.update(visible=True), gr.update(visible=state.get("show_options", False)), gr.update(visible=False)


In [None]:
def handle_option_2_click_enhanced_dynamic(chatbot_history, state):
    '''
    Handle Option 2 button clicks in two-step disambiguation process, mirroring Option 1 functionality for second choice.
    Manages topic and answer type selection, cleans chat history, executes CollabSearch processing, controls UI visibility and error handling.
    '''
    if not state:
        yield chatbot_history, {}, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)
        return

    current_step = state.get("step", "topic_selection")

    if current_step == "topic_selection" and state.get("intent_options") and len(state["intent_options"]) > 1:
        selected_topic = state["intent_options"][1]
        state["selected_topic_intent"] = selected_topic

        answer_type_options = generate_answer_type_options(state.get("original_query", ""), selected_topic)
        state["answer_type_options"] = answer_type_options
        state["step"] = "answer_type_selection"
        state["show_options"] = True

        # Clean history and show answer type options
        if chatbot_history and len(chatbot_history) >= 2:
            cleaned_history = chatbot_history[:-1]
        else:
            cleaned_history = chatbot_history

        final_history, _ = show_answer_type_options_clean(cleaned_history, state, answer_type_options)
        yield final_history, state, gr.update(visible=True), gr.update(visible=True), gr.update(visible=False)

    elif current_step == "answer_type_selection" and state.get("answer_type_options") and len(state["answer_type_options"]) > 1:
        selected_answer_type = state["answer_type_options"][1]

        state["show_options"] = False
        state["processing"] = True

        # Clean history
        if chatbot_history and len(chatbot_history) >= 2:
            cleaned_history = chatbot_history[:-1]
        else:
            cleaned_history = chatbot_history

        # Show processing
        processing_msg = f"🔄 **Processing your analysis...**\n\nAnalyzing '{state.get('original_query', '')}' using the COLA framework..."
        processing_message = {"role": "assistant", "content": processing_msg}
        processing_history = cleaned_history + [processing_message]

        yield processing_history, state, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)

        # Process
        try:
            answer = add_predictions_sequential_intelligent(
                state.get("original_query", ""),
                state.get("selected_topic_intent", ""),
                selected_answer_type,
                state.get("query_id"),
                state
            )

            final_message = {"role": "assistant", "content": str(answer)}
            final_history = cleaned_history + [final_message]

            state["processing"] = False
            state["show_options"] = False

            yield final_history, state, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)

        except Exception as e:
            error_msg = f"❌ **Error processing query:** {str(e)}"
            error_message = {"role": "assistant", "content": error_msg}
            error_history = cleaned_history + [error_message]

            state["processing"] = False
            state["show_options"] = False

            yield error_history, state, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)
    else:
        yield chatbot_history, state, gr.update(visible=True), gr.update(visible=state.get("show_options", False)), gr.update(visible=False)

In [None]:
def handle_option_3_click_enhanced_dynamic(chatbot_history, state):
    '''
    Handle Option 3 button clicks in two-step disambiguation process, completing the trio of selection options.
    Manages topic and answer type selection for third choice, cleans chat history, executes CollabSearch processing, controls UI visibility.
    '''
    if not state:
        yield chatbot_history, {}, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)
        return

    current_step = state.get("step", "topic_selection")

    if current_step == "topic_selection" and state.get("intent_options") and len(state["intent_options"]) > 2:
        selected_topic = state["intent_options"][2]
        state["selected_topic_intent"] = selected_topic

        answer_type_options = generate_answer_type_options(state.get("original_query", ""), selected_topic)
        state["answer_type_options"] = answer_type_options
        state["step"] = "answer_type_selection"
        state["show_options"] = True

        # Clean history and show answer type options
        if chatbot_history and len(chatbot_history) >= 2:
            cleaned_history = chatbot_history[:-1]
        else:
            cleaned_history = chatbot_history

        final_history, _ = show_answer_type_options_clean(cleaned_history, state, answer_type_options)
        yield final_history, state, gr.update(visible=True), gr.update(visible=True), gr.update(visible=False)

    elif current_step == "answer_type_selection" and state.get("answer_type_options") and len(state["answer_type_options"]) > 2:
        selected_answer_type = state["answer_type_options"][2]

        state["show_options"] = False
        state["processing"] = True

        # Clean history
        if chatbot_history and len(chatbot_history) >= 2:
            cleaned_history = chatbot_history[:-1]
        else:
            cleaned_history = chatbot_history

        # Show processing
        processing_msg = f"🔄 **Processing your analysis...**\n\nAnalyzing '{state.get('original_query', '')}' using the COLA framework..."
        processing_message = {"role": "assistant", "content": processing_msg}
        processing_history = cleaned_history + [processing_message]

        yield processing_history, state, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)

        # Process
        try:
            answer = add_predictions_sequential_intelligent(
                state.get("original_query", ""),
                state.get("selected_topic_intent", ""),
                selected_answer_type,
                state.get("query_id"),
                state
            )

            final_message = {"role": "assistant", "content": str(answer)}
            final_history = cleaned_history + [final_message]

            state["processing"] = False
            state["show_options"] = False

            yield final_history, state, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)

        except Exception as e:
            error_msg = f"❌ **Error processing query:** {str(e)}"
            error_message = {"role": "assistant", "content": error_msg}
            error_history = cleaned_history + [error_message]

            state["processing"] = False
            state["show_options"] = False

            yield error_history, state, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False)
    else:
        yield chatbot_history, state, gr.update(visible=True), gr.update(visible=state.get("show_options", False)), gr.update(visible=False)

In [None]:
def handle_other_option_click_dynamic(chatbot_history, state):
    '''
    Handle "Other" option clicks to allow custom topic or answer type specification.
    Prompts user for custom input based on current step (topic/answer type), manages UI transitions, updates state for custom input processing.
    '''
    if not state:
        return chatbot_history, {}, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), ""

    current_step = state.get("step", "topic_selection")

    if current_step == "topic_selection":
        # Topic selection phase - ask for custom topic
        state["waiting_for_custom"] = True
        state["custom_input_type"] = "topic"
        state["show_options"] = False
        state["show_main_input"] = False

        # Clean history and show custom input request
        cleaned_history = chatbot_history[:-1] if chatbot_history else []

        custom_msg = """🔧 **Custom Topic Selection**

The provided options don't match what you're looking for? No problem!

Please specify your preferred topic or domain in the text box below. For example:
- "Python machine learning"
- "Apple company stock"
- "Java coffee brewing"

*Tip: Be as specific as possible to get the best results.*"""

        custom_message = {"role": "assistant", "content": custom_msg}
        updated_history = cleaned_history + [custom_message]

        return updated_history, state, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), ""

    elif current_step == "answer_type_selection":
        # Answer type selection phase
        state["waiting_for_custom"] = True
        state["custom_input_type"] = "answer_type"
        state["show_options"] = False
        state["show_main_input"] = False

        # Clean history and show custom input request
        cleaned_history = chatbot_history[:-1] if chatbot_history else []

        custom_msg = """🔧 **Custom Answer Type**

Need a different type of response? Please specify what kind of answer you're looking for:

Examples:
- "Step-by-step tutorial"
- "Pros and cons comparison"
- "Historical timeline"
- "Technical specifications"

*Tip: Describe the format or style of response you prefer.*"""

        custom_message = {"role": "assistant", "content": custom_msg}
        updated_history = cleaned_history + [custom_message]

        return updated_history, state, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), ""

    else:
        return chatbot_history, state, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), ""


In [None]:
def process_custom_input_with_spellcheck(user_input, input_type):
    '''
    Process and clean user's custom input with basic spell checking for common technical terms.
    Applies corrections for frequent misspellings of programming languages, tech concepts, and common words using regex substitution.
    '''

    # Basic cleaning
    cleaned_input = user_input.strip()

    # Basic spell corrections
    spell_corrections = {
        "phyton": "Python", "pyhton": "Python", "pythn": "Python",
        "javas": "Java", "jave": "Java",
        "javascript": "JavaScript", "js": "JavaScript",
        "machien learning": "machine learning",
        "artifical intelligence": "artificial intelligence",
        "blockchian": "blockchain",
        "cyrptocurrency": "cryptocurrency",
        "tutorail": "tutorial", "tutoral": "tutorial",
        "comparision": "comparison", "comparsion": "comparison",
        "recomendation": "recommendation", "recomendations": "recommendations"
    }

    # Apply corrections
    for wrong, correct in spell_corrections.items():
        if wrong.lower() in cleaned_input.lower():
            cleaned_input = re.sub(re.escape(wrong), correct, cleaned_input, flags=re.IGNORECASE)

    return cleaned_input

In [None]:
def handle_custom_input_submit_dynamic(custom_input, chatbot_history, state):
    '''
    Process custom user input during disambiguation with validation, spell checking, and state management.
    Handles custom topic or answer type specification, validates input, updates state transitions, manages UI element visibility.
    '''
    if not state or not state.get("waiting_for_custom"):
        yield chatbot_history, state, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), ""
        return

    if not custom_input or not custom_input.strip():
        error_msg = {"role": "assistant", "content": "❌ Please enter your custom option before submitting."}
        yield chatbot_history + [error_msg], state, gr.update(visible=False), gr.update(visible=False), gr.update(visible=True), custom_input
        return

    # Process the custom input
    input_type = state.get("custom_input_type", "topic")
    processed_input = process_custom_input_with_spellcheck(custom_input, input_type)

    # Clean state
    state["waiting_for_custom"] = False
    state["custom_input_type"] = ""
    state["show_main_input"] = True

    current_step = state.get("step", "topic_selection")

    if current_step == "topic_selection":
        # Handle custom topic selection
        state["selected_topic_intent"] = processed_input

        answer_type_options = generate_answer_type_options(state.get("original_query", ""), processed_input)
        state["answer_type_options"] = answer_type_options
        state["step"] = "answer_type_selection"
        state["show_options"] = True

        # Clean history and show answer type options
        cleaned_history = chatbot_history[:-1] if chatbot_history else []
        final_history, _ = show_answer_type_options_clean(cleaned_history, state, answer_type_options)

        yield final_history, state, gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), ""

    elif current_step == "answer_type_selection":
        # Handle custom answer type - use generator pattern
        yield from handle_custom_answer_type_processing(processed_input, chatbot_history, state)

In [None]:
def handle_custom_answer_type_processing(processed_input, chatbot_history, state):
    '''
    Generator function for processing custom answer type selection with real-time UI updates.
    Handles processing state management, shows progress messages, executes COLA analysis, provides error handling with yield pattern.
    '''
    state["selected_answer_type"] = processed_input
    state["show_options"] = False
    state["processing"] = True

    # Clean history
    cleaned_history = chatbot_history[:-1] if chatbot_history else []

    # Show processing message
    processing_msg = f"🔄 **Processing your analysis...**\n\nAnalyzing '{state.get('original_query', '')}' with custom answer type: {processed_input}"
    processing_message = {"role": "assistant", "content": processing_msg}
    processing_history = cleaned_history + [processing_message]

    # Clean state
    state["waiting_for_custom"] = False
    state["custom_input_type"] = ""
    state["show_options"] = False
    state["show_main_input"] = True

    # First yield - show processing
    yield processing_history, state, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), ""

    # Process
    try:
        answer = add_predictions_sequential_intelligent(
            state.get("original_query", ""),
            state.get("selected_topic_intent", ""),
            processed_input,
            state.get("query_id"),
            state
        )

        # Replace processing with final answer
        final_message = {"role": "assistant", "content": str(answer)}
        final_history = cleaned_history + [final_message]

        state["processing"] = False
        state["show_options"] = False

        yield final_history, state, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), ""

    except Exception as e:
        error_msg = f"❌ **Processing Error:** {str(e)}"
        error_message = {"role": "assistant", "content": error_msg}
        error_history = cleaned_history + [error_message]

        state["processing"] = False
        state["show_options"] = False

        yield error_history, state, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), ""


In [None]:
def handle_custom_input_cancel_dynamic(chatbot_history, state):
    '''
    Handle cancellation of custom input during disambiguation process.
    Clears custom input state, removes custom input request message, restores option selection UI, adds cancellation confirmation.
    '''
    if not state:
        return chatbot_history, {}, gr.update(visible=True), gr.update(visible=False), gr.update(visible=False), ""

    # Clear custom input state
    state["waiting_for_custom"] = False
    state["custom_input_type"] = ""
    state["show_options"] = True
    state["show_main_input"] = True

    # Remove custom input request message
    cleaned_history = chatbot_history[:-1] if chatbot_history else []

    # Add cancellation message
    cancel_msg = {"role": "assistant", "content": "❌ **Custom input cancelled.** Please select one of the provided options above."}
    updated_history = cleaned_history + [cancel_msg]

    return updated_history, state, gr.update(visible=True), gr.update(visible=True), gr.update(visible=False), ""

In [None]:
def safe_close():
    '''
    Clean shutdown function
    '''
    print("🔄 Closing application...")
    # Add any cleanup here
    return "✅ Application closed safely. You can close the browser tab now."

##User Interface Call

In [None]:
def create_enhanced_gradio_interface():
    '''
    Create enhanced Gradio web interface for CollabSearch framework with dual-intent disambiguation and RAG enhancement.
    Initializes database, sets up chatbot UI, option buttons, custom input handling, state management, and event handlers for complete user interaction.
    '''
    # Initialize database at startup
    try:
        initialize_database_with_sources()
    except Exception as e:
        print(f"❌ Database initialization failed: {e}")

    with gr.Blocks(title="COLA Framework - Enhanced Collaborative Search") as demo:

        # Chat interface
        chatbot = gr.Chatbot(
            label="Enhanced Collaborative Search with Dual Intent Clarification",
            type="messages",
            height=500,
            show_label=True,
            container=True
        )

        # Main input components
        with gr.Row(visible=True) as main_input_row:
            with gr.Column():
                msg = gr.Textbox(
                    label="Your query",
                    placeholder="Ask me anything! (e.g., 'What is Python?', 'How to learn Java?')",
                    lines=2,
                    max_lines=5
                )
                send_btn = gr.Button("🚀 Send Query", variant="primary", size="lg")

        # Intent selection buttons
        with gr.Row(visible=False) as option_buttons_row:
            option1_btn = gr.Button("Option 1", size="lg", variant="secondary")
            option2_btn = gr.Button("Option 2", size="lg", variant="secondary")
            option3_btn = gr.Button("Option 3", size="lg", variant="secondary")
            option4_btn = gr.Button("Other", size="lg", variant="secondary")

        # Custom input components
        with gr.Row(visible=False) as custom_input_row:
            with gr.Column():
                custom_input = gr.Textbox(
                    label="Your custom option",
                    placeholder="Type your preferred topic or answer type here...",
                    lines=2
                )
                with gr.Row():
                    submit_custom_btn = gr.Button("✅ Submit Custom Option", variant="primary")
                    cancel_custom_btn = gr.Button("❌ Cancel", variant="secondary")

        # RAG enhancement button
        extra_btn = gr.Button("🔄 Fact Check with Google", variant="secondary")

        # Management buttons
        with gr.Row():
            stats_btn = gr.Button("📊 View Database Stats")
            close_btn = gr.Button("🔴 Close App", variant="stop")

        stats_output = gr.Textbox(label="Database Information", lines=10, visible=False)

        # Enhanced state management
        state = gr.State({
            "query_id": None,
            "original_query": "",
            "processing_query_id": None,
            "intent_options": [],
            "answer_type_options": [],
            "selected_topic_intent": "",
            "selected_answer_type": "",
            "step": "topic_selection",
            "working_query": "",
            "topic": "",
            "target_role_map": {},
            "waiting_for_custom": False,
            "custom_input_type": "",
            "show_options": False,
            "processing": False,
            "show_main_input": True,
            "currently_processing": None
        })

        # Event handlers - direct function calls (no wrapper for generators)

        # Main query handlers (generators)
        send_btn.click(
            slow_echo_with_dual_intent_disambiguation_dynamic,
            inputs=[msg, chatbot, state],
            outputs=[chatbot, state, msg, main_input_row, option_buttons_row, custom_input_row]
        ).then(lambda: "", outputs=[msg])

        msg.submit(
            slow_echo_with_dual_intent_disambiguation_dynamic,
            inputs=[msg, chatbot, state],
            outputs=[chatbot, state, msg, main_input_row, option_buttons_row, custom_input_row]
        ).then(lambda: "", outputs=[msg])

        # Option handlers (generators)
        option1_btn.click(
            handle_option_1_click_enhanced_dynamic,
            inputs=[chatbot, state],
            outputs=[chatbot, state, main_input_row, option_buttons_row, custom_input_row]
        )

        option2_btn.click(
            handle_option_2_click_enhanced_dynamic,
            inputs=[chatbot, state],
            outputs=[chatbot, state, main_input_row, option_buttons_row, custom_input_row]
        )

        option3_btn.click(
            handle_option_3_click_enhanced_dynamic,
            inputs=[chatbot, state],
            outputs=[chatbot, state, main_input_row, option_buttons_row, custom_input_row]
        )

        option4_btn.click(
            handle_other_option_click_dynamic,
            inputs=[chatbot, state],
            outputs=[chatbot, state, main_input_row, option_buttons_row, custom_input_row, custom_input]
        )

        # Custom input handlers (generators)
        submit_custom_btn.click(
            handle_custom_input_submit_dynamic,
            inputs=[custom_input, chatbot, state],
            outputs=[chatbot, state, main_input_row, option_buttons_row, custom_input_row, custom_input]
        )

        cancel_custom_btn.click(
            handle_custom_input_cancel_dynamic,
            inputs=[chatbot, state],
            outputs=[chatbot, state, main_input_row, option_buttons_row, custom_input_row, custom_input]
        )

        # RAG handler (generator)
        extra_btn.click(
            execute_rag_update,
            inputs=[chatbot, state],
            outputs=[chatbot, state, msg]
        )

        # Stats handler (non-generator)
        def show_stats():
            try:
                stats = view_comprehensive_database_stats()
                return gr.update(visible=True), stats
            except Exception as e:
                return gr.update(visible=True), f"Error loading stats: {e}"

        stats_btn.click(show_stats, outputs=[stats_output, stats_output])

        def safe_close():
            return "✅ Application closed safely. You can close the browser tab now."

        close_btn.click(safe_close, outputs=[stats_output])

        demo.launch(share=True)

In [None]:
create_enhanced_gradio_interface()

✅ Database already exists
* Running on local URL:  http://127.0.0.1:7864
* Running on public URL: https://e9deef606019946315.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


🚀 STARTING QUERY PROCESSING
📋 Query ID: 1755694204422243
📝 Query: Lastest Iphone models
📚 Existing chat history: 0 messages
✅ Added query 1755694204422243 to database
🎯 Generated options: ['Best Iphone Models for Photography', 'Latest Iphone Models for Business', 'Top Iphone Models for Gaming']

🔒 PROCESSING: Query 1755694204422243
📝 Query: Lastest Iphone models
🎯 Intent: Best Iphone Models for Photography
📄 Answer Type: Comparison/Evaluation (pros and cons of latest iPhone models for photography)
✏️ Rewritten Query: What are the pros and cons of the latest iPhone models for photography, and which one is the best for capturing high-quality images?
🔍 Processing answer type: 'comparison/evaluation'
✅ PARTIAL DICTIONARY MATCH: 'comparison/evaluation' ↔ 'comparison' → 'DECISION_MAKING'
🎯 Synthesis Method: DECISION_MAKING
🏷️ Topic: Best Iphone Models for Photography
👥 Roles: ['iPhone Photographer', 'Mobile Photography Specialist', 'Apple Device Analyst']
⚙️ SYNTHESIS: DECISION_MAKING
✅ Upda

## Technical Insights and Innovations

### Novel Architectural Decisions
1. **Dual Intent Clarification**: Separates topic disambiguation from answer type selection
2. **Method-Specific Synthesis**: Different synthesis approaches based on user intent
3. **Session-Based RAG**: Uses previous analysis to improve search relevance
4. **Comprehensive Process Tracking**: Every routing and synthesis decision logged
5. **Adaptive Role Playing Prompting**: Generalization of CoLA Framework that adapts to any topic to perform multi-perspective analysis

### Performance Optimizations
1. **Dictionary-First Routing**: Fast deterministic routing before expensive LLM calls
2. **Parallel Expert Analysis**: Multiple agents can process simultaneously
3. **Selective RAG Enhancement**: Optional enhancement based on user needs
4. **State Caching**: Preserves processing results across UI interactions

### Scalability Considerations
1. **Modular Architecture**: Easy to add new synthesis methods or experts
2. **Pluggable Components**: RAG, routing, and synthesis can be independently modified
3. **Configuration-Driven**: API endpoints and parameters externally configurable
4. **Error Isolation**: Component failures don't cascade to other system parts

---

This comprehensive analysis demonstrates how the CollabSearch Framework implements sophisticated AI orchestration patterns while maintaining system reliability, user experience quality, and processing transparency. The architecture serves as an excellent example of modern multi-agent system design with practical considerations for real-world deployment.