Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,17 @@ An MCP server for ClickHouse.
* List all databases on your ClickHouse cluster.

* `list_tables`
* List all tables in a database.
* Input: `database` (string): The name of the database.
* List tables in a database with pagination.
* Required input: `database` (string).
* Optional inputs:
* `like` / `not_like` (string): Apply `LIKE` or `NOT LIKE` filters to table names.
* `page_token` (string): Token returned by a previous call for fetching the next page.
* `page_size` (int, default `50`): Number of tables returned per page.
* `include_detailed_columns` (bool, default `true`): When `false`, omits column metadata for lighter responses while keeping the full `create_table_query`.
* Response shape:
* `tables`: Array of table objects for the current page.
* `next_page_token`: Pass this value back to fetch the next page, or `null` when there are no more tables.
* `total_tables`: Total count of tables that match the supplied filters.

### chDB Tools

Expand Down
8 changes: 8 additions & 0 deletions mcp_clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
create_chdb_client,
run_chdb_select_query,
chdb_initial_prompt,
table_pagination_cache,
fetch_table_names_from_system,
get_paginated_table_data,
create_page_token,
)


Expand All @@ -26,4 +30,8 @@
"create_chdb_client",
"run_chdb_select_query",
"chdb_initial_prompt",
"table_pagination_cache",
"fetch_table_names_from_system",
"get_paginated_table_data",
"create_page_token",
]
250 changes: 233 additions & 17 deletions mcp_clickhouse/mcp_server.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import logging
import json
from typing import Optional, List, Any
from typing import Optional, List, Any, Dict
import concurrent.futures
import atexit
import os
import uuid

import clickhouse_connect
import chdb.session as chs
from clickhouse_connect.driver.binding import format_query_value
from dotenv import load_dotenv
from fastmcp import FastMCP
from cachetools import TTLCache
from fastmcp.tools import Tool
from fastmcp.prompts import Prompt
from fastmcp.exceptions import ToolError
Expand Down Expand Up @@ -135,36 +137,250 @@ def list_databases():
return json.dumps(databases)


def list_tables(database: str, like: Optional[str] = None, not_like: Optional[str] = None):
"""List available ClickHouse tables in a database, including schema, comment,
row count, and column count."""
logger.info(f"Listing tables in database '{database}'")
client = create_clickhouse_client()
query = f"SELECT database, name, engine, create_table_query, dependencies_database, dependencies_table, engine_full, sorting_key, primary_key, total_rows, total_bytes, total_bytes_uncompressed, parts, active_parts, total_marks, comment FROM system.tables WHERE database = {format_query_value(database)}"
# Store pagination state for list_tables with 1-hour expiry
# Using TTLCache from cachetools to automatically expire entries after 1 hour
table_pagination_cache: TTLCache = TTLCache(maxsize=100, ttl=3600) # 3600 seconds = 1 hour


def fetch_table_names_from_system(
client,
database: str,
like: Optional[str] = None,
not_like: Optional[str] = None,
) -> List[str]:
"""Get list of table names from system.tables.

Args:
client: ClickHouse client
database: Database name
like: Optional pattern to filter table names (LIKE)
not_like: Optional pattern to filter out table names (NOT LIKE)

Returns:
List of table names
"""
query = f"SELECT name FROM system.tables WHERE database = {format_query_value(database)}"
if like:
query += f" AND name LIKE {format_query_value(like)}"

if not_like:
query += f" AND name NOT LIKE {format_query_value(not_like)}"

result = client.query(query)
table_names = [row[0] for row in result.result_rows]
return table_names


def get_paginated_table_data(
client,
database: str,
table_names: List[str],
start_idx: int,
page_size: int,
include_detailed_columns: bool = True,
) -> tuple[List[Table], int, bool]:
"""Get detailed information for a page of tables.

Args:
client: ClickHouse client
database: Database name
table_names: List of all table names to paginate
start_idx: Starting index for pagination
page_size: Number of tables per page
include_detailed_columns: Whether to include detailed column metadata (default: True)

Returns:
Tuple of (list of Table objects, end index, has more pages)
"""
end_idx = min(start_idx + page_size, len(table_names))
current_page_table_names = table_names[start_idx:end_idx]

if not current_page_table_names:
return [], end_idx, False

query = f"""
SELECT database, name, engine, create_table_query, dependencies_database,
dependencies_table, engine_full, sorting_key, primary_key, total_rows,
total_bytes, total_bytes_uncompressed, parts, active_parts, total_marks, comment
FROM system.tables
WHERE database = {format_query_value(database)}
AND name IN ({", ".join(format_query_value(name) for name in current_page_table_names)})
"""

# Deserialize result as Table dataclass instances
result = client.query(query)
tables = result_to_table(result.column_names, result.result_rows)

for table in tables:
column_data_query = f"SELECT database, table, name, type AS column_type, default_kind, default_expression, comment FROM system.columns WHERE database = {format_query_value(database)} AND table = {format_query_value(table.name)}"
column_data_query_result = client.query(column_data_query)
table.columns = [
c
for c in result_to_column(
if include_detailed_columns:
for table in tables:
column_data_query = f"""
SELECT database, table, name, type AS column_type, default_kind, default_expression, comment
FROM system.columns
WHERE database = {format_query_value(database)}
AND table = {format_query_value(table.name)}
"""
column_data_query_result = client.query(column_data_query)
table.columns = result_to_column(
column_data_query_result.column_names,
column_data_query_result.result_rows,
)
]
else:
for table in tables:
table.columns = []

return tables, end_idx, end_idx < len(table_names)


def create_page_token(
database: str,
like: Optional[str],
not_like: Optional[str],
table_names: List[str],
end_idx: int,
include_detailed_columns: bool,
) -> str:
"""Create a new page token and store it in the cache.

Args:
database: Database name
like: LIKE pattern used to filter tables
not_like: NOT LIKE pattern used to filter tables
table_names: List of all table names
end_idx: Index to start from for the next page
include_detailed_columns: Whether to include detailed column metadata

Returns:
New page token
"""
token = str(uuid.uuid4())
table_pagination_cache[token] = {
"database": database,
"like": like,
"not_like": not_like,
"table_names": table_names,
"start_idx": end_idx,
"include_detailed_columns": include_detailed_columns,
}
return token


def list_tables(
database: str,
like: Optional[str] = None,
not_like: Optional[str] = None,
page_token: Optional[str] = None,
page_size: int = 50,
include_detailed_columns: bool = True,
) -> Dict[str, Any]:
"""List available ClickHouse tables in a database, including schema, comment,
row count, and column count.

Args:
database: The database to list tables from
like: Optional LIKE pattern to filter table names
not_like: Optional NOT LIKE pattern to exclude table names
page_token: Token for pagination, obtained from a previous call
page_size: Number of tables to return per page (default: 50)
include_detailed_columns: Whether to include detailed column metadata (default: True).
When False, the columns array will be empty but create_table_query still contains
all column information. This reduces payload size for large schemas.

Returns:
A dictionary containing:
- tables: List of table information (as dictionaries)
- next_page_token: Token for the next page, or None if no more pages
- total_tables: Total number of tables matching the filters
"""
logger.info(
"Listing tables in database '%s' with like=%s, not_like=%s, "
"page_token=%s, page_size=%s, include_detailed_columns=%s",
database,
like,
not_like,
page_token,
page_size,
include_detailed_columns,
)
client = create_clickhouse_client()

if page_token and page_token in table_pagination_cache:
cached_state = table_pagination_cache[page_token]
cached_include_detailed = cached_state.get("include_detailed_columns", True)

if (
cached_state["database"] != database
or cached_state["like"] != like
or cached_state["not_like"] != not_like
or cached_include_detailed != include_detailed_columns
):
logger.warning(
"Page token %s is for a different database, filter, or metadata setting. "
"Ignoring token and starting from beginning.",
page_token,
)
page_token = None
else:
table_names = cached_state["table_names"]
start_idx = cached_state["start_idx"]

tables, end_idx, has_more = get_paginated_table_data(
client,
database,
table_names,
start_idx,
page_size,
include_detailed_columns,
)

next_page_token = None
if has_more:
next_page_token = create_page_token(
database, like, not_like, table_names, end_idx, include_detailed_columns
)

del table_pagination_cache[page_token]

logger.info(
"Returned page with %s tables (total: %s), next_page_token=%s",
len(tables),
len(table_names),
next_page_token,
)
return {
"tables": [asdict(table) for table in tables],
"next_page_token": next_page_token,
"total_tables": len(table_names),
}

table_names = fetch_table_names_from_system(client, database, like, not_like)

start_idx = 0
tables, end_idx, has_more = get_paginated_table_data(
client,
database,
table_names,
start_idx,
page_size,
include_detailed_columns,
)

next_page_token = None
if has_more:
next_page_token = create_page_token(
database, like, not_like, table_names, end_idx, include_detailed_columns
)

logger.info(
"Found %s tables, returning %s with next_page_token=%s",
len(table_names),
len(tables),
next_page_token,
)

logger.info(f"Found {len(tables)} tables")
return [asdict(table) for table in tables]
return {
"tables": [asdict(table) for table in tables],
"next_page_token": next_page_token,
"total_tables": len(table_names),
}


def execute_query(query: str):
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ dependencies = [
"clickhouse-connect>=0.8.16",
"truststore>=0.10",
"chdb>=3.3.0",
"cachetools>=5.5.0",
]

[project.scripts]
Expand Down
Loading