# Flight Search Agent - Agent Catalog + Couchbase Tutorial

## Overview

This tutorial demonstrates how to build a flight search agent using:
- **Agent Catalog**: Tool registration and agent framework
- **Couchbase**: Vector database for semantic search and data storage
- **LangGraph**: Agent workflow orchestration
- **OpenAI**: Language models and embeddings

### What You'll Learn
1. Setting up Couchbase infrastructure with vector search
2. Registering tools with Agent Catalog
3. Building intelligent parameter mapping
4. Creating a ReAct agent with LangGraph
5. Implementing flight search, booking, and policy queries

### Architecture Overview
```
User Query → LangGraph Agent → Agent Catalog Tools → Couchbase Database
                ↓
Parameter Mapping → Tool Execution → Vector Search → Response Generation
```

## Section 1: Environment Setup and Dependencies

First, let's install the required dependencies and set up our environment.

In [None]:
# Install required packages (uncomment if not already installed)
# !pip install langchain langchain-openai langgraph couchbase python-dotenv langchain-couchbase
# !pip install agentc-core agentc-langgraph  # These should be installed from local paths

In [None]:
# Import required libraries
import getpass
import json
import logging
import os
import sys
import time
from datetime import timedelta

import agentc
import agentc_langgraph.agent
import agentc_langgraph.graph
import dotenv
import langchain_core.messages
import langchain_core.runnables
import langchain_openai.chat_models
import langgraph.graph
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.management.buckets import CreateBucketSettings
from couchbase.management.search import SearchIndex
from couchbase.options import ClusterOptions
from langchain_couchbase.vectorstores import CouchbaseSearchVectorStore
from langchain_openai import OpenAIEmbeddings

# Load environment variables
dotenv.load_dotenv(override=True)

# Setup logging
logging.basicConfig(
    level=logging.DEBUG, 
    format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)

print("All dependencies imported successfully!")

### Environment Variables Setup

Configure the required environment variables for Couchbase and OpenAI connections.

In [None]:
def _set_if_undefined(var: str):
    if os.environ.get(var) is None:
        os.environ[var] = getpass.getpass(f"Please provide your {var}: ")

def setup_environment():
    """Setup required environment variables with defaults."""
    required_vars = ["OPENAI_API_KEY", "CB_CONN_STRING", "CB_USERNAME", "CB_PASSWORD", "CB_BUCKET"]
    for var in required_vars:
        _set_if_undefined(var)

    defaults = {
        "CB_CONN_STRING": "couchbase://localhost",
        "CB_USERNAME": "Administrator",
        "CB_PASSWORD": "password",
        "CB_BUCKET": "vector-search-testing",
    }

    for key, default_value in defaults.items():
        if not os.environ.get(key):
            os.environ[key] = input(f"Enter {key} (default: {default_value}): ") or default_value

    os.environ["INDEX_NAME"] = os.getenv("INDEX_NAME", "vector_search_agentcatalog")
    os.environ["SCOPE_NAME"] = os.getenv("SCOPE_NAME", "shared")
    os.environ["COLLECTION_NAME"] = os.getenv("COLLECTION_NAME", "agentcatalog")

# Setup environment
setup_environment()
print("Environment variables configured successfully!")
print(f"Using bucket: {os.environ['CB_BUCKET']}")

## Section 2: Couchbase Infrastructure Setup

Now let's establish our Couchbase connection and set up the necessary infrastructure.

In [None]:
def setup_couchbase_connection():
    """Setup Couchbase cluster connection."""
    try:
        auth = PasswordAuthenticator(os.environ["CB_USERNAME"], os.environ["CB_PASSWORD"])
        options = ClusterOptions(auth)
        cluster = Cluster(os.environ["CB_CONN_STRING"], options)
        cluster.wait_until_ready(timedelta(seconds=10))
        logger.info("Successfully connected to Couchbase")
        return cluster
    except Exception as e:
        raise ConnectionError(f"Failed to connect to Couchbase: {e!s}")

# Establish connection
cluster = setup_couchbase_connection()

In [None]:
def setup_collection(cluster, bucket_name, scope_name, collection_name):
    """Setup Couchbase bucket, scope and collection."""
    try:
        try:
            bucket = cluster.bucket(bucket_name)
            logger.info(f"Bucket '{bucket_name}' exists")
        except Exception:
            logger.info(f"Creating bucket '{bucket_name}'...")
            bucket_settings = CreateBucketSettings(
                name=bucket_name,
                bucket_type="couchbase",
                ram_quota_mb=1024,
                flush_enabled=True,
                num_replicas=0,
            )
            cluster.buckets().create_bucket(bucket_settings)
            time.sleep(5)
            bucket = cluster.bucket(bucket_name)
            logger.info(f"Bucket '{bucket_name}' created successfully")

        bucket_manager = bucket.collections()

        scopes = bucket_manager.get_all_scopes()
        scope_exists = any(scope.name == scope_name for scope in scopes)

        if not scope_exists and scope_name != "_default":
            logger.info(f"Creating scope '{scope_name}'...")
            bucket_manager.create_scope(scope_name)
            logger.info(f"Scope '{scope_name}' created successfully")

        collections = bucket_manager.get_all_scopes()
        collection_exists = any(
            scope.name == scope_name and collection_name in [col.name for col in scope.collections]
            for scope in collections
        )

        if not collection_exists:
            logger.info(f"Creating collection '{collection_name}'...")
            bucket_manager.create_collection(scope_name, collection_name)
            logger.info(f"Collection '{collection_name}' created successfully")

        collection = bucket.scope(scope_name).collection(collection_name)
        time.sleep(3)

        try:
            cluster.query(
                f"CREATE PRIMARY INDEX IF NOT EXISTS ON `{bucket_name}`.`{scope_name}`.`{collection_name}`"
            ).execute()
            logger.info("Primary index created successfully")
        except Exception as e:
            logger.warning(f"Error creating primary index: {e!s}")

        logger.info("Collection setup complete")
        return collection
    except Exception as e:
        raise RuntimeError(f"Error setting up collection: {e!s}")

# Setup collection
collection = setup_collection(
    cluster,
    os.environ["CB_BUCKET"],
    os.environ["SCOPE_NAME"],
    os.environ["COLLECTION_NAME"]
)

### Vector Search Index Setup

Load the vector search index configuration from the existing agentcatalog_index.json file.

In [None]:
def setup_vector_search_index(cluster, index_definition):
    """Setup vector search index for flight data."""
    try:
        scope_index_manager = (
            cluster.bucket(os.environ["CB_BUCKET"]).scope(os.environ["SCOPE_NAME"]).search_indexes()
        )

        existing_indexes = scope_index_manager.get_all_indexes()
        index_name = index_definition["name"]

        if index_name not in [index.name for index in existing_indexes]:
            logger.info(f"Creating vector search index '{index_name}'...")
            search_index = SearchIndex.from_json(index_definition)
            scope_index_manager.upsert_index(search_index)
            logger.info(f"Vector search index '{index_name}' created successfully")
        else:
            logger.info(f"Vector search index '{index_name}' already exists")
    except Exception as e:
        raise RuntimeError(f"Error setting up vector search index: {e!s}")

# Load index definition from file (same as main.py)
try:
    with open("agentcatalog_index.json") as file:
        index_definition = json.load(file)
    logger.info("Loaded vector search index definition from agentcatalog_index.json")
    setup_vector_search_index(cluster, index_definition)
except Exception as e:
    logger.warning(f"Error loading index definition: {e!s}")
    logger.info("Continuing without vector search index...")

## Section 3: Flight Data and Vector Store

Load our flight data and set up the vector store for semantic search.

In [None]:
def load_flight_data():
    """Load flight data from our enhanced flight_data.py file."""
    try:
        # Import flight data
        sys.path.append(os.path.join(os.path.dirname(__file__), "data"))
        from flight_data import get_all_flight_data

        flight_data = get_all_flight_data()

        # Convert to text format for vector store
        flight_texts = []
        for item in flight_data:
            text = f"{item['title']} - {item['content']}"
            flight_texts.append(text)

        return flight_texts
    except Exception as e:
        raise ValueError(f"Error loading flight data: {e!s}")

def setup_vector_store(cluster):
    """Setup vector store and load flight data."""
    try:
        embeddings = OpenAIEmbeddings(
            api_key=os.environ["OPENAI_API_KEY"], model="text-embedding-3-small"
        )

        vector_store = CouchbaseSearchVectorStore(
            cluster=cluster,
            bucket_name=os.environ["CB_BUCKET"],
            scope_name=os.environ["SCOPE_NAME"],
            collection_name=os.environ["COLLECTION_NAME"],
            embedding=embeddings,
            index_name=os.environ["INDEX_NAME"],
        )

        flight_data = load_flight_data()

        try:
            vector_store.add_texts(texts=flight_data, batch_size=10)
            logger.info("Flight data loaded into vector store successfully")
        except Exception as e:
            logger.warning(
                f"Error loading flight data: {e!s}. Vector store created but data not loaded."
            )

        return vector_store
    except Exception as e:
        raise ValueError(f"Error setting up vector store: {e!s}")

# Setup vector store
vector_store = setup_vector_store(cluster)

## Section 4: Agent Catalog Integration

Initialize the Agent Catalog and set up tool registration.

In [None]:
# Initialize Agent Catalog
catalog = agentc.Catalog(
    conn_string=os.environ["AGENT_CATALOG_CONN_STRING"],
    username=os.environ["AGENT_CATALOG_USERNAME"],
    password=os.environ["AGENT_CATALOG_PASSWORD"],
    bucket=os.environ["AGENT_CATALOG_BUCKET"],
)
application_span = catalog.Span(name="Flight Search Agent")

print("Agent Catalog initialized successfully")

## Section 5: Parameter Mapping Implementation

This is a key feature of the flight search agent - intelligent parameter mapping using LLMs. This is copied directly from parameter_mapper.py.

In [None]:
# Import the exact ParameterMapper from parameter_mapper.py
from parameter_mapper import ParameterMapper

# Initialize chat model for parameter mapping
model_name = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
chat_model = langchain_openai.chat_models.ChatOpenAI(model=model_name, temperature=0.1)

# Initialize parameter mapper
parameter_mapper = ParameterMapper(chat_model)

print("Parameter mapper initialized")

# Test parameter mapping
test_text = "I want to fly from New York JFK to Los Angeles LAX"
airports = parameter_mapper.extract_airports_from_text(test_text)
print(f"\nParameter mapping test:")
print(f"Input: {test_text}")
print(f"Extracted: {airports}")

## Section 6: Flight Search Agent Implementation

This is the core FlightSearchAgent class and related state management, copied directly from main.py.

In [None]:
class FlightSearchState(agentc_langgraph.agent.State):
    """State for flight search conversations - single user system."""

    query: str
    resolved: bool
    search_results: list[dict]


class FlightSearchAgent(agentc_langgraph.agent.ReActAgent):
    """Flight search agent using Agent Catalog tools and prompts."""

    def __init__(self, catalog: agentc.Catalog, span: agentc.Span):
        """Initialize the flight search agent."""

        model_name = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
        chat_model = langchain_openai.chat_models.ChatOpenAI(model=model_name, temperature=0.1)

        # Initialize parameter mapper for intelligent parameter handling
        self.parameter_mapper = ParameterMapper(chat_model)

        super().__init__(
            chat_model=chat_model,
            catalog=catalog,
            span=span,
            prompt_name="flight_search_assistant",
        )

    def _invoke(
        self,
        span: agentc.Span,
        state: FlightSearchState,
        config: langchain_core.runnables.RunnableConfig,
    ) -> FlightSearchState:
        """Handle flight search conversation with Agent Catalog tools."""

        # Initialize conversation if this is the first message
        if not state["messages"]:
            initial_msg = langchain_core.messages.HumanMessage(content=state["query"])
            state["messages"].append(initial_msg)
            logger.info(f"Flight Query: {state['query']}")

        try:
            # Get tools from Agent Catalog using find method like hotel agent
            from langchain_core.tools import Tool

            tools = []
            tool_configs = [
                ("lookup_flight_info", "Find flight routes between airports"),
                ("save_flight_booking", "Save flight booking requests"),
                ("retrieve_flight_bookings", "Retrieve existing flight bookings"),
                ("search_flight_policies", "Search flight policies and guidelines"),
            ]

            for tool_name, description in tool_configs:
                try:
                    tool_obj = self.catalog.find("tool", name=tool_name)
                    if tool_obj and hasattr(tool_obj, "func"):
                        # Create a wrapper function using intelligent parameter mapping
                        def create_tool_wrapper(func, tool_name):
                            def wrapper(*args, **kwargs):
                                try:
                                    logger.debug(
                                        f"Tool {tool_name} called with args: {args}, kwargs: {kwargs}"
                                    )

                                    # Case 1: Handle ReAct agent format with multiple positional args
                                    if len(args) > 1:
                                        # ReAct agent passes multiple string arguments like ("SFO", "LAX")
                                        mapped_args = self.parameter_mapper.map_positional_args(
                                            tool_name, args, func
                                        )
                                        return func(**mapped_args)

                                    # Case 2: Handle single string argument (could be JSON or plain string)
                                    elif len(args) == 1 and isinstance(args[0], str) and not kwargs:
                                        input_str = args[0].strip()

                                        # Handle empty string or "None" for tools that take no args
                                        if not input_str or input_str.lower() in [
                                            "none",
                                            "null",
                                            "",
                                        ]:
                                            if tool_name == "retrieve_flight_bookings":
                                                return func()  # Call with no arguments
                                            else:
                                                # For other tools, try with empty dict
                                                mapped_args = (
                                                    self.parameter_mapper.map_parameters_smart(
                                                        tool_name, {}, func
                                                    )
                                                )
                                                return func(**mapped_args)

                                        # Try to parse as JSON first
                                        try:
                                            import json

                                            parsed_args = json.loads(input_str)

                                            if isinstance(parsed_args, dict):
                                                mapped_args = (
                                                    self.parameter_mapper.map_parameters_smart(
                                                        tool_name, parsed_args, func
                                                    )
                                                )
                                                return func(**mapped_args)
                                        except (json.JSONDecodeError, TypeError):
                                            pass  # Not JSON, continue to other parsing methods

                                        # Handle comma-separated format like "SFO", "LAX"
                                        if "," in input_str and '"' in input_str:
                                            try:
                                                # Parse comma-separated quoted strings
                                                import ast

                                                # Convert string like '"SFO", "LAX"' to tuple
                                                formatted_str = f"({input_str})"
                                                parsed_tuple = ast.literal_eval(formatted_str)
                                                if isinstance(parsed_tuple, tuple):
                                                    mapped_args = (
                                                        self.parameter_mapper.map_positional_args(
                                                            tool_name, parsed_tuple, func
                                                        )
                                                    )
                                                    return func(**mapped_args)
                                            except (ValueError, SyntaxError):
                                                pass

                                        # Fallback: treat as single parameter for search tools
                                        mapped_args = self.parameter_mapper.map_string_input(
                                            tool_name, input_str, func
                                        )
                                        return func(**mapped_args)

                                    # Case 3: Normal function call with kwargs
                                    elif kwargs:
                                        mapped_args = self.parameter_mapper.map_parameters_smart(
                                            tool_name, kwargs, func
                                        )
                                        return func(**mapped_args)

                                    # Case 4: No arguments - for tools like retrieve_flight_bookings
                                    elif len(args) == 0:
                                        if tool_name == "retrieve_flight_bookings":
                                            return func()
                                        else:
                                            mapped_args = (
                                                self.parameter_mapper.map_parameters_smart(
                                                    tool_name, {}, func
                                                )
                                            )
                                            return func(**mapped_args)

                                    # Default fallback
                                    return func(*args, **kwargs)

                                except Exception as e:
                                    logger.error(f"Tool {tool_name} execution error: {e}")
                                    logger.debug(f"Failed with args: {args}, kwargs: {kwargs}")
                                    return f"Error executing {tool_name}: {e!s}"

                            return wrapper

                        # Convert to LangChain Tool format with closure fix
                        wrapped_func = create_tool_wrapper(tool_obj.func, tool_name)
                        lc_tool = Tool(
                            name=tool_obj.meta.name,
                            description=tool_obj.meta.description or description,
                            func=wrapped_func,
                        )
                        tools.append(lc_tool)
                        logger.info(f"Loaded tool: {tool_name}")
                except Exception as e:
                    logger.warning(f"Could not load tool {tool_name}: {e}")

            if not tools:
                logger.error("No tools loaded from catalog")
                error_msg = langchain_core.messages.AIMessage(
                    content="I'm unable to access my flight search tools right now. Please try again later."
                )
                state["messages"].append(error_msg)
                state["resolved"] = True
                return state

            # Create a proper ReAct agent like hotel support agent
            from langchain import hub
            from langchain.agents import AgentExecutor, create_react_agent

            try:
                # Get ReAct prompt from hub
                react_prompt = hub.pull("hwchase17/react")

                # Create ReAct agent with tools
                agent = create_react_agent(self.chat_model, tools, react_prompt)
                agent_executor = AgentExecutor(
                    agent=agent,
                    tools=tools,
                    verbose=True,
                    handle_parsing_errors=True,
                    max_iterations=5,
                )

                # Execute the agent with the query
                query = state["query"]
                result = agent_executor.invoke({"input": query})

                # Create response message
                response_content = result.get(
                    "output", "I was unable to process your flight search request."
                )
                response_msg = langchain_core.messages.AIMessage(content=response_content)
                state["messages"].append(response_msg)

                logger.info(f"Agent Response: {response_content}")

                # Mark as resolved
                state["resolved"] = True

            except Exception as agent_error:
                logger.error(f"ReAct agent error: {agent_error}")
                # Fallback to simple model response
                fallback_response = self.chat_model.invoke(state["messages"])
                state["messages"].append(fallback_response)

                if hasattr(fallback_response, "content"):
                    logger.info(f"Fallback Response: {fallback_response.content}")

                state["resolved"] = True

        except Exception as e:
            logger.error(f"Agent invocation error: {e}")
            error_msg = langchain_core.messages.AIMessage(
                content=f"I encountered an error while processing your request: {e!s}"
            )
            state["messages"].append(error_msg)
            state["resolved"] = True

        return state

print("FlightSearchAgent class defined")

## Section 7: LangGraph Workflow Implementation

This is the FlightSearchGraph class copied directly from main.py.

In [None]:
class FlightSearchGraph(agentc_langgraph.graph.GraphRunnable):
    """Flight search conversation graph using Agent Catalog."""

    @staticmethod
    def build_starting_state(query: str) -> FlightSearchState:
        """Build the initial state for the flight search - single user system."""
        return FlightSearchState(
            messages=[],
            query=query,
            resolved=False,
            search_results=[],
            previous_node=None,
        )

    def compile(self) -> langgraph.graph.graph.CompiledGraph:
        """Compile the LangGraph workflow."""

        # Build the flight search agent with catalog integration
        search_agent = FlightSearchAgent(catalog=self.catalog, span=self.span)

        # Create a simple workflow graph for flight search
        workflow = langgraph.graph.StateGraph(FlightSearchState)

        # Add the flight search agent node
        workflow.add_node("flight_search", search_agent)

        # Set entry point and simple flow
        workflow.set_entry_point("flight_search")
        workflow.add_edge("flight_search", langgraph.graph.END)

        return workflow.compile()

# Create the flight search graph
flight_graph = FlightSearchGraph(catalog=catalog, span=application_span)
# Compile the graph
compiled_graph = flight_graph.compile()

print("Flight search graph compiled successfully")

## Section 8: Testing the Flight Search Agent

Now let's test our flight search agent with various scenarios.

In [None]:
def run_flight_search_query(query: str):
    """Run a flight search query and display results."""
    print(f"\nFlight Query: {query}")
    print("=" * 50)
    
    try:
        with application_span.new(f"Query: {query}") as span:
            logger.info(f"Flight Query: {query}")
            span["query"] = query

            # Build starting state - single user system
            state = FlightSearchGraph.build_starting_state(query=query)

            # Run the flight search
            result = compiled_graph.invoke(state)
            span["result"] = result

            # Display results summary
            if result.get("search_results"):
                logger.info(f"Found {len(result['search_results'])} flight options")

            logger.info(f"Search completed: {result.get('resolved', False)}")
            
            return result

    except Exception as e:
        logger.error(f"Search error: {e}")
        return None

print("Query function defined")

In [None]:
# Test query 1: Flight search
result = run_flight_search_query("Find flights from New York JFK to Los Angeles LAX")

In [None]:
# Test query 2: Policy search
result = run_flight_search_query("What's your baggage policy?")

In [None]:
# Test query 3: Booking retrieval
result = run_flight_search_query("Show me my current bookings")

In [None]:
# Test query 4: Flight booking
result = run_flight_search_query("Book a flight from San Francisco to Chicago for tomorrow")

In [None]:
# Test query 5: Cancellation policy
result = run_flight_search_query("What are the cancellation rules?")

## Conclusion

You've successfully built a flight search agent using:

### What We Accomplished
- **Infrastructure Setup**: Configured Couchbase with vector search capabilities
- **Agent Catalog Integration**: Registered and utilized intelligent tools
- **Smart Parameter Mapping**: Implemented LLM-powered parameter extraction
- **LangGraph Agent**: Built a ReAct agent with sophisticated workflows
- **Vector Search**: Enabled semantic policy and information retrieval

### Key Features Demonstrated
1. **Flight Information Lookup** - Search routes between airports
2. **Intelligent Booking System** - Create bookings with validation
3. **Booking Management** - Retrieve and manage existing bookings
4. **Policy Search** - Semantic search through flight policies
5. **Smart Parameter Handling** - Flexible input processing

### Further Learning
- [Agent Catalog Documentation](https://docs.agentcatalog.ai)
- [Couchbase Vector Search Guide](https://docs.couchbase.com/server/current/vector-search/vector-search.html)
- [LangGraph Documentation](https://langchain-ai.github.io/langgraph/)

Thank you for completing this tutorial!