Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ An [MCP](https://modelcontextprotocol.io/) server implementation of Couchbase th
- Get a list of all the scopes in the specified bucket
- Get a list of all the collections in a specified scope and bucket. Note that this tool requires the cluster to have Query service.
- Get the structure for a collection
- List all indexes in the cluster with their definitions, with optional filtering by bucket, scope, and collection
- Get a document by ID from a specified scope and collection
- Upsert a document by ID to a specified scope and collection
- Delete a document by ID from a specified scope and collection
Expand Down
7 changes: 7 additions & 0 deletions src/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,11 @@
This module contains all the MCP tools for Couchbase operations.
"""

# Index tools
from .index import (
list_indexes,
)

# Key-Value tools
from .kv import (
delete_document_by_id,
Expand Down Expand Up @@ -40,6 +45,7 @@
delete_document_by_id,
get_schema_for_collection,
run_sql_plus_plus_query,
list_indexes,
]

__all__ = [
Expand All @@ -55,6 +61,7 @@
"delete_document_by_id",
"get_schema_for_collection",
"run_sql_plus_plus_query",
"list_indexes",
# Convenience
"ALL_TOOLS",
]
115 changes: 115 additions & 0 deletions src/tools/index.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you will have a merge conflict with #58.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I am aware of that. I will merge main once other one get merged.

Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
"""
Tools for index operations.

This module contains tools for listing and managing indexes in the Couchbase cluster.
"""

import logging
from typing import Any

from mcp.server.fastmcp import Context

from tools.query import run_cluster_query
from utils.constants import MCP_SERVER_NAME
from utils.index_utils import generate_index_definition

logger = logging.getLogger(f"{MCP_SERVER_NAME}.tools.index")


def list_indexes(
ctx: Context,
bucket_name: str | None = None,
scope_name: str | None = None,
collection_name: str | None = None,
) -> list[dict[str, Any]]:
"""List all indexes in the cluster with optional filtering by bucket, scope, and collection.
Returns a simplified list of indexes with their names, primary flag, and CREATE INDEX definitions.
Excludes sequential scan indexes. For GSI indexes, includes the CREATE INDEX definition.

Args:
ctx: MCP context for cluster connection
bucket_name: Optional bucket name to filter indexes
scope_name: Optional scope name to filter indexes (requires bucket_name)
collection_name: Optional collection name to filter indexes (requires bucket_name and scope_name)

Returns:
List of dictionaries with keys: name (str), is_primary (bool), definition (str, GSI only)
"""
try:
# Build query with filters based on provided parameters
query = "SELECT * FROM system:all_indexes"
conditions = []
params = {}

if bucket_name:
conditions.append("bucket_id = $bucket_name")
params["bucket_name"] = bucket_name

if scope_name:
if not bucket_name:
raise ValueError("bucket_name is required when filtering by scope_name")
conditions.append("scope_id = $scope_name")
params["scope_name"] = scope_name

if collection_name:
if not bucket_name or not scope_name:
raise ValueError(
"bucket_name and scope_name are required when filtering by collection_name"
)
conditions.append("keyspace_id = $collection_name")
params["collection_name"] = collection_name

if conditions:
query += " WHERE " + " AND ".join(conditions)

query += " ORDER BY bucket_id, scope_id, keyspace_id, name"

# Execute query with parameters
logger.info(f"Executing query: {query} with params: {params}")
result = run_cluster_query(ctx, query, **params)

indexes = []
for row in result:
# Extract the actual index data from the nested structure
# When querying system:all_indexes, data is wrapped in 'all_indexes' key
index_data = row.get("all_indexes", row)

# Skip sequential scan indexes
using = index_data.get("using", "").lower()
if using == "sequentialscan":
continue

# Prepare data for index definition generation
temp_data = {
"name": index_data.get("name"),
"bucket": index_data.get("bucket_id"),
"scope": index_data.get("scope_id"),
"collection": index_data.get("keyspace_id"),
"index_type": index_data.get("using", "gsi"),
"is_primary": index_data.get("is_primary", False),
"index_key": index_data.get("index_key", []),
"condition": index_data.get("condition"),
"partition": index_data.get("partition"),
"using": index_data.get("using", "gsi"),
}
Comment on lines +83 to +94
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The temp_data dictionary includes a redundant index_type key. The using key is already present with the same value, and the generate_index_definition function only uses the using key. Removing the index_type key will improve code clarity.

            temp_data = {
                "name": index_data.get("name"),
                "bucket": index_data.get("bucket_id"),
                "scope": index_data.get("scope_id"),
                "collection": index_data.get("keyspace_id"),
                "is_primary": index_data.get("is_primary", False),
                "index_key": index_data.get("index_key", []),
                "condition": index_data.get("condition"),
                "partition": index_data.get("partition"),
                "using": index_data.get("using", "gsi"),
            }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename the using to type & then use it to generate the index definition?


# Generate index definition for GSI indexes
index_definition = generate_index_definition(temp_data)

# Only return the essential information
index_info = {
"name": index_data.get("name"),
"is_primary": index_data.get("is_primary", False),
}

# Add definition only if it was generated (GSI indexes only)
if index_definition:
index_info["definition"] = index_definition

indexes.append(index_info)

logger.info(f"Found {len(indexes)} indexes (excluding sequential scans)")
return indexes
except Exception as e:
logger.error(f"Error listing indexes: {e}")
raise
7 changes: 7 additions & 0 deletions src/utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
get_cluster_connection,
)

# Index utilities
from .index_utils import (
generate_index_definition,
)

# Note: Individual modules create their own hierarchical loggers using:
# logger = logging.getLogger(f"{MCP_SERVER_NAME}.module.name")

Expand All @@ -46,6 +51,8 @@
# Context
"AppContext",
"get_cluster_connection",
# Index utilities
"generate_index_definition",
# Constants
"MCP_SERVER_NAME",
"DEFAULT_READ_ONLY_MODE",
Expand Down
75 changes: 75 additions & 0 deletions src/utils/index_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
Utility functions for index operations.

This module contains helper functions for working with Couchbase indexes.
"""

import logging
from typing import Any

from .constants import MCP_SERVER_NAME

logger = logging.getLogger(f"{MCP_SERVER_NAME}.utils.index_utils")


def generate_index_definition(index_data: dict[str, Any]) -> str | None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an easier way to get this data? The concern here is that there might be more parameters than the ones we parse like in the case of vector search such as with.
Did you try the management API in the SDK to see if it has something out of the box?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think anything return Index Definition in SDK though management API. I can recheck if some update is there. We are using the same approach in VS Code and jetbrains for this reason only. I will recheck.

"""Generate CREATE INDEX statement for GSI indexes.

Args:
index_data: Dictionary containing index information with keys:
- name: Index name
- bucket: Bucket name
- scope: Scope name (optional)
- collection: Collection name (optional)
- is_primary: Boolean indicating if it's a primary index
- using: Index type (must be "gsi" for definition generation)
- index_key: List of index keys
- condition: WHERE condition (optional)
- partition: PARTITION BY clause (optional)

Returns:
CREATE INDEX statement string for GSI indexes, None for other types
"""
# Only generate definition for GSI indexes
if index_data.get("using") != "gsi":
return None

try:
# Start building the definition
if index_data.get("is_primary"):
query_definition = "CREATE PRIMARY INDEX"
else:
query_definition = "CREATE INDEX"

# Add index name
query_definition += f" `{index_data['name']}`"

# Add bucket name
query_definition += f" ON `{index_data['bucket']}`"

# Add scope and collection if they exist
scope = index_data.get("scope")
collection = index_data.get("collection")
if scope and collection:
query_definition += f".`{scope}`.`{collection}`"

# Add index keys for non-primary indexes
index_keys = index_data.get("index_key", [])
if index_keys and len(index_keys) > 0:
keys_str = ", ".join(str(key) for key in index_keys)
query_definition += f"({keys_str})"

# Add WHERE condition if exists
condition = index_data.get("condition")
if condition:
query_definition += f" WHERE {condition}"

# Add PARTITION BY if exists
partition = index_data.get("partition")
if partition:
query_definition += f" PARTITION BY {partition}"

return query_definition
except Exception as e:
logger.warning(f"Error generating index definition: {e}")
return None
Comment on lines +33 to +75
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The function could generate an invalid CREATE INDEX statement if name or bucket are missing or None in index_data. This can happen if the query to system:all_indexes returns incomplete data. It's safer to add explicit validation for these required fields at the beginning of the function to prevent generating invalid SQL and to make the function more robust. While the try...except block catches some errors, proactive validation is a better practice.

    # Only generate definition for GSI indexes
    if index_data.get("using") != "gsi":
        return None

    name = index_data.get("name")
    bucket = index_data.get("bucket")

    if not name or not bucket:
        logger.warning(
            f"Cannot generate index definition due to missing name or bucket. Data: {index_data}"
        )
        return None

    try:
        # Start building the definition
        if index_data.get("is_primary"):
            query_definition = "CREATE PRIMARY INDEX"
        else:
            query_definition = "CREATE INDEX"

        # Add index name
        query_definition += f" `{name}`"

        # Add bucket name
        query_definition += f" ON `{bucket}`"

        # Add scope and collection if they exist
        scope = index_data.get("scope")
        collection = index_data.get("collection")
        if scope and collection:
            query_definition += f".`{scope}`.`{collection}`"

        # Add index keys for non-primary indexes
        index_keys = index_data.get("index_key", [])
        if index_keys:
            keys_str = ", ".join(str(key) for key in index_keys)
            query_definition += f"({keys_str})"

        # Add WHERE condition if exists
        condition = index_data.get("condition")
        if condition:
            query_definition += f" WHERE {condition}"

        # Add PARTITION BY if exists
        partition = index_data.get("partition")
        if partition:
            query_definition += f" PARTITION BY {partition}"

        return query_definition
    except Exception as e:
        logger.warning(f"Error generating index definition: {e}")
        return None