# Introduction

In this guide, we will walk you through building a powerful semantic search engine using Couchbase as the backend database and [CrewAI](https://github.com/joaomdmoura/crewAI) for agent-based RAG operations. CrewAI allows us to create specialized agents that can work together to handle different aspects of the RAG workflow, from document retrieval to response generation. This tutorial is designed to be beginner-friendly, with clear, step-by-step instructions that will equip you with the knowledge to create a fully functional semantic search system from scratch.

# Setting the Stage: Installing Necessary Libraries

To build our semantic search engine, we need a robust set of tools. The libraries we install handle everything from connecting to databases to performing complex machine learning tasks.

In [None]:
%pip install datasets langchain-couchbase langchain-openai crewai python-dotenv tqdm

# Importing Necessary Libraries

The script starts by importing a series of libraries required for various tasks, including handling JSON, logging, time tracking, Couchbase connections, embedding generation, and dataset loading.

In [None]:
import json
import logging
import time
import sys
import os
from datetime import timedelta
from uuid import uuid4
from typing import Any, Optional
from dotenv import load_dotenv

from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.exceptions import InternalServerFailureException, QueryIndexAlreadyExistsException
from couchbase.management.search import SearchIndex
from couchbase.options import ClusterOptions
from datasets import load_dataset
from langchain_openai import OpenAIEmbeddings, ChatOpenAI
from langchain_core.documents import Document
from langchain_core.globals import set_llm_cache
from langchain_couchbase.cache import CouchbaseCache
from langchain_couchbase.vectorstores import CouchbaseVectorStore
from langchain.tools import Tool
from crewai import Agent, Task, Crew, Process
from tqdm import tqdm

# Setup Logging

Logging is configured to track the progress of the script and capture any errors or warnings.

In [None]:
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)

# Configuration Settings

Load configuration settings from environment variables or use default values.

In [None]:
# Load environment variables
load_dotenv()

# Configuration
OPENAI_API_KEY = os.getenv('OPENAI_API_KEY')
if not OPENAI_API_KEY:
    raise ValueError("OPENAI_API_KEY is not set")

CB_HOST = os.getenv('CB_HOST', 'couchbase://localhost')
CB_USERNAME = os.getenv('CB_USERNAME', 'Administrator')
CB_PASSWORD = os.getenv('CB_PASSWORD', 'password')
CB_BUCKET_NAME = os.getenv('CB_BUCKET_NAME', 'vector-search-testing')
INDEX_NAME = os.getenv('INDEX_NAME', 'vector_search_crew')
SCOPE_NAME = os.getenv('SCOPE_NAME', 'shared')
COLLECTION_NAME = os.getenv('COLLECTION_NAME', 'crew')
CACHE_COLLECTION = os.getenv('CACHE_COLLECTION', 'cache')

# Vector Search Tool Implementation

Define a custom tool for performing vector searches with improved result formatting and error handling.

In [None]:
def vector_search(query: str, vector_store: Any) -> str:
    """Perform vector search and return formatted results"""
    try:
        # Ensure query is a string
        if isinstance(query, dict):
            query = str(query.get('query', ''))
        elif not isinstance(query, str):
            query = str(query)
            
        # Get more results and with higher similarity threshold
        docs = vector_store.similarity_search(
            query,
            k=8,  # Increase number of results
            fetch_k=20  # Fetch more candidates for reranking
        )
        
        # Format results with more context
        results = []
        for i, doc in enumerate(docs, 1):
            # Add document number and content
            results.append(f"Document {i}:")
            results.append("-" * 40)
            results.append(doc.page_content)
            
            # Add metadata if available
            if hasattr(doc, 'metadata') and doc.metadata:
                results.append("\nMetadata:")
                for key, value in doc.metadata.items():
                    results.append(f"{key}: {value}")
            
            results.append("\n")  # Add spacing between documents
            
        return "\n".join(results)
        
    except Exception as e:
        logging.error(f"Vector search failed: {str(e)}")
        raise

def create_vector_search_tool(vector_store: Any) -> Tool:
    """Create a vector search tool"""
    return Tool(
        name="vector_search",
        func=lambda query: vector_search(query, vector_store),
        description="""Search for relevant documents using vector similarity.
        Input should be a simple text query string.
        Returns a list of relevant document contents with metadata.
        Use this tool to find detailed information about topics."""
    )

# Setting Up Couchbase and Vector Store

Initialize Couchbase connection, setup collections, and create the vector store.

In [None]:
def setup_couchbase():
    """Initialize Couchbase connection and setup collections"""
    try:
        # Connect to Couchbase
        auth = PasswordAuthenticator(CB_USERNAME, CB_PASSWORD)
        options = ClusterOptions(auth)
        cluster = Cluster(CB_HOST, options)
        cluster.wait_until_ready(timedelta(seconds=5))
        logging.info("Successfully connected to Couchbase")
        
        def setup_collection(collection_name):
            bucket = cluster.bucket(CB_BUCKET_NAME)
            bucket_manager = bucket.collections()
            
            # Check if collection exists
            collections = bucket_manager.get_all_scopes()
            collection_exists = any(
                scope.name == SCOPE_NAME and collection_name in [col.name for col in scope.collections]
                for scope in collections
            )
            
            if not collection_exists:
                bucket_manager.create_collection(SCOPE_NAME, collection_name)
                logging.info(f"Collection '{collection_name}' created")
            else:
                logging.info(f"Collection '{collection_name}' already exists")
            
            # Create primary index
            cluster.query(
                f"CREATE PRIMARY INDEX IF NOT EXISTS ON `{CB_BUCKET_NAME}`.`{SCOPE_NAME}`.`{collection_name}`"
            ).execute()
            logging.info(f"Primary index created for '{collection_name}'")
            
            # Clear collection
            cluster.query(
                f"DELETE FROM `{CB_BUCKET_NAME}`.`{SCOPE_NAME}`.`{collection_name}`"
            ).execute()
            logging.info(f"Collection '{collection_name}' cleared")
            
            return bucket.scope(SCOPE_NAME).collection(collection_name)
        
        # Setup main and cache collections
        setup_collection(COLLECTION_NAME)
        setup_collection(CACHE_COLLECTION)
        
        return cluster
        
    except Exception as e:
        logging.error(f"Failed to setup Couchbase: {str(e)}")
        raise

def setup_vector_store(cluster):
    """Initialize vector store and embeddings"""
    try:
        # Load index definition
        with open('crew_index.json', 'r') as file:
            index_definition = json.load(file)
        
        # Setup vector search index
        scope_index_manager = cluster.bucket(CB_BUCKET_NAME).scope(SCOPE_NAME).search_indexes()
        
        # Check if index exists
        try:
            existing_indexes = scope_index_manager.get_all_indexes()
            index_exists = any(index.name == INDEX_NAME for index in existing_indexes)
            
            if index_exists:
                logging.info(f"Index '{INDEX_NAME}' already exists")
            else:
                search_index = SearchIndex.from_json(index_definition)
                scope_index_manager.upsert_index(search_index)
                logging.info(f"Index '{INDEX_NAME}' created")
                
        except Exception as e:
            logging.warning(f"Error handling index: {str(e)}")
            # Continue anyway since the index might exist
        
        # Initialize OpenAI components
        embeddings = OpenAIEmbeddings(
            openai_api_key=OPENAI_API_KEY,
            model="text-embedding-ada-002"
        )
        
        llm = ChatOpenAI(
            openai_api_key=OPENAI_API_KEY,
            model="gpt-4o",
            temperature=0
        )
        
        # Setup vector store
        vector_store = CouchbaseVectorStore(
            cluster=cluster,
            bucket_name=CB_BUCKET_NAME,
            scope_name=SCOPE_NAME,
            collection_name=COLLECTION_NAME,
            embedding=embeddings,
            index_name=INDEX_NAME,
        )
        logging.info("Vector store initialized")
        
        # Setup cache
        cache = CouchbaseCache(
            cluster=cluster,
            bucket_name=CB_BUCKET_NAME,
            scope_name=SCOPE_NAME,
            collection_name=CACHE_COLLECTION,
        )
        set_llm_cache(cache)
        logging.info("Cache initialized")
        
        return vector_store, llm
        
    except Exception as e:
        logging.error(f"Failed to setup vector store: {str(e)}")
        raise

# Load Sample Data

Load and process the TREC dataset for our vector store.

In [None]:
def load_sample_data(vector_store):
    """Load sample data into vector store"""
    try:
        # Load TREC dataset
        trec = load_dataset('trec', split='train[:1000]')
        logging.info(f"Loaded {len(trec)} samples from TREC dataset")
        
        # Disable logging during data loading
        logging.disable(logging.INFO)
        
        # Add documents in batches
        batch_size = 50
        for i in tqdm(range(0, len(trec['text']), batch_size), desc="Loading data"):
            batch = trec['text'][i:i + batch_size]
            documents = [Document(page_content=text) for text in batch]
            uuids = [str(uuid4()) for _ in range(len(documents))]
            vector_store.add_documents(documents=documents, ids=uuids)
            
        # Re-enable logging
        logging.disable(logging.NOTSET)
        logging.info("Sample data loaded into vector store")
            
    except Exception as e:
        # Re-enable logging in case of error
        logging.disable(logging.NOTSET)
        logging.error(f"Failed to load sample data: {str(e)}")
        raise

# Setting Up CrewAI Agents

Create specialized agents for research and writing tasks.

In [None]:
def setup_agents(llm, vector_store):
    """Create CrewAI agents"""
    # Create vector search tool
    search_tool = create_vector_search_tool(vector_store)
    
    # Custom response template for better formatting
    response_template = """
    Analysis Results:
    ----------------
    {{ .Response }}
    
    Sources Used:
    ------------
    {% for tool in .Tools %}
    - {{ tool.name }}
    {% endfor %}
    
    Confidence Level: {{ .Confidence }}
    Analysis Time: {{ .ExecutionTime }}
    """
    
    researcher = Agent(
        role='Research Expert',
        goal='Find and analyze the most relevant documents to answer user queries accurately',
        backstory="""You are an expert researcher with deep knowledge in information retrieval 
        and analysis. Your expertise lies in finding, evaluating, and synthesizing information 
        from various sources. You have a keen eye for detail and can identify key insights 
        from complex documents. You always verify information across multiple sources and 
        provide comprehensive, accurate analyses.""",
        tools=[search_tool],
        llm=llm,
        verbose=True,
        memory=True,
        allow_delegation=False,
        response_template=response_template
    )
    
    writer = Agent(
        role='Technical Writer',
        goal='Generate clear, accurate, and well-structured responses based on research findings',
        backstory="""You are a skilled technical writer with expertise in making complex 
        information accessible and engaging. You excel at organizing information logically, 
        explaining technical concepts clearly, and creating well-structured documents. You 
        ensure all information is properly cited, accurate, and presented in a user-friendly 
        manner. You have a talent for maintaining the reader's interest while conveying 
        detailed technical information.""",
        llm=llm,
        verbose=True,
        memory=True,
        allow_delegation=False,
        response_template=response_template
    )
    
    logging.info("Agents created")
    return researcher, writer

# Creating Tasks

Define tasks for research and writing with detailed instructions.

In [None]:
def create_tasks(query, researcher, writer):
    """Create CrewAI tasks"""
    # Research task
    research_task = Task(
        description=f"""Research and analyze information relevant to: {query}
        
        Follow these steps:
        1. Use the vector_search tool to find relevant documents
        2. Search with multiple variations of the query to ensure comprehensive coverage
        3. Analyze each document carefully, noting key points and supporting evidence
        4. Cross-reference information across documents to verify accuracy
        5. Identify any conflicting information or gaps in knowledge
        6. Organize findings into clear, logical categories
        
        Focus on:
        - Accuracy and completeness of information
        - Relevance to the query
        - Quality and reliability of sources
        - Key concepts and their relationships
        - Supporting evidence and examples""",
        agent=researcher,
        expected_output="""A detailed analysis containing:
        1. Key findings organized by topic
        2. Supporting evidence from documents
        3. Any conflicting information or uncertainties
        4. Gaps in knowledge that may need further research
        5. Relevant context and background information"""
    )
    
    # Writing task
    writing_task = Task(
        description=f"""Create a comprehensive and well-structured response based on the research findings.
        
        Follow these steps:
        1. Review and analyze all research findings
        2. Organize information into a logical structure
        3. Create clear section headings and transitions
        4. Explain complex concepts in accessible language
        5. Include relevant examples and illustrations
        6. Ensure proper citation of sources
        
        The response should be:
        1. Clear and easy to understand
        2. Well-organized with logical flow
        3. Accurate and supported by research
        4. Engaging and informative
        5. Appropriate for the target audience""",
        agent=writer,
        expected_output="""A clear, comprehensive response that:
        1. Answers the query completely
        2. Is well-structured and organized
        3. Uses clear, accessible language
        4. Includes relevant examples
        5. Cites supporting evidence
        6. Maintains reader engagement""",
        context=[research_task]  # Properly set task dependency
    )
    
    return [research_task, writing_task]

# Main Search Function

Implement the main search functionality that coordinates the agents and tasks.

In [None]:
def format_response(result: Any) -> str:
    """Format the response for better readability"""
    if not result:
        return "No response generated"
        
    # Format the main response
    formatted = []
    formatted.append("=" * 80)
    formatted.append("RESPONSE")
    formatted.append("=" * 80)
    formatted.append(str(result))
    
    # Add task outputs if available
    if hasattr(result, 'tasks_output'):
        formatted.append("\n" + "=" * 80)
        formatted.append("DETAILED TASK OUTPUTS")
        formatted.append("=" * 80)
        for task_output in result.tasks_output:
            formatted.append(f"\nTask: {task_output.description[:100]}...")
            formatted.append("-" * 40)
            formatted.append(f"Output: {task_output.raw}")
            formatted.append("-" * 40)
    
    return "\n".join(formatted)

def search(query: str, vector_store: Any, researcher: Any, writer: Any) -> Optional[str]:
    """Perform search and generate response"""
    try:
        # Create and execute crew
        crew = Crew(
            agents=[researcher, writer],
            tasks=create_tasks(query, researcher, writer),
            process=Process.sequential,  # Execute tasks in order
            verbose=True,
            cache=True,  # Enable caching
            planning=True  # Enable planning capability
        )
        
        result = crew.kickoff()
        return result
        
    except Exception as e:
        logging.error(f"Search failed: {str(e)}")
        logging.exception("Error details:")
        raise

# Running the Search System

Initialize the system and run some example queries.

In [None]:
# Setup components
print("\nInitializing search system...")
print("This may take a few minutes for initial setup.")
print("=" * 80)

cluster = setup_couchbase()
vector_store, llm = setup_vector_store(cluster)
load_sample_data(vector_store)
researcher, writer = setup_agents(llm, vector_store)

print("\nSetup complete! You can now enter your queries.")
print("=" * 80)

# Example queries
queries = [
    "What caused the 1929 Great Depression?",
    "Why do heavier objects fall faster?",
    "How does photosynthesis work?"
]

for query in queries:
    print(f"\nQuery: {query}")
    print("-" * 80)
    
    start_time = time.time()
    result = search(query, vector_store, researcher, writer)
    elapsed_time = time.time() - start_time
    
    print(f"\nQuery completed in {elapsed_time:.2f} seconds")
    print(format_response(result))
    print("=" * 80)