Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,15 @@ Add to your MCP client configuration (e.g., Claude Desktop, Windsurf):
- **Claude Code** (CLI)
- **Windsurf** / **Claude Desktop** / **Continue.dev** / **Cursor IDE**

#### Options variables

- `DATABEND_DSN`: Databend connection string
- `LOCAL_MODE`: Set to `true` to use local Databend
- `SAFE_MODE`: Set to `false` to disable safe mode
- `DATABEND_MCP_SERVER_TRANSPORT`: Default to `stdio`, set to `http` or `sse` to enable HTTP/SSE transport
- `DATABEND_MCP_BIND_HOST`: Default to `127.0.0.1`, set to bind host for HTTP/SSE transport
- `DATABEND_MCP_BIND_PORT`: Default to `8001`, set to bind port for HTTP/SSE transport

### Step 4: Start Using

Once configured, you can ask your AI assistant to:
Expand Down
53 changes: 53 additions & 0 deletions mcp_databend/env.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,19 @@

from dataclasses import dataclass
import os
from enum import Enum


class TransportType(Enum):
"""Transport types for MCP server."""
STDIO = "stdio"
HTTP = "http"
SSE = "sse"

@classmethod
def values(cls):
"""Get all transport type values."""
return [member.value for member in cls]


@dataclass
Expand Down Expand Up @@ -40,6 +53,46 @@ def local_mode(self) -> bool:
"on",
)

@property
def mcp_server_transport(self) -> str:
"""Get the MCP server transport method.

Valid options: "stdio", "http", "sse"
Default: "stdio"
"""
transport = os.getenv("DATABEND_MCP_SERVER_TRANSPORT", TransportType.STDIO.value).lower()

# Validate transport type
if transport not in TransportType.values():
valid_options = ", ".join(f'"{t}"' for t in TransportType.values())
raise ValueError(f"Invalid transport '{transport}'. Valid options: {valid_options}")
return transport

@property
def mcp_bind_host(self) -> str:
"""Get the MCP server bind host for HTTP/SSE transports.

Default: "127.0.0.1"
"""
return os.getenv("DATABEND_MCP_BIND_HOST", "127.0.0.1")

@property
def mcp_bind_port(self) -> int:
"""Get the MCP server bind port for HTTP/SSE transports.

Default: 8001
"""
port_str = os.getenv("DATABEND_MCP_BIND_PORT", "8001")
try:
port = int(port_str)
if port < 1 or port > 65535:
raise ValueError(f"Port must be between 1 and 65535, got {port}")
return port
except ValueError as e:
if "invalid literal" in str(e):
raise ValueError(f"Invalid port value '{port_str}'. Must be a valid integer.")
raise


# Global instance placeholder for the singleton pattern
_CONFIG_INSTANCE = None
Expand Down
17 changes: 15 additions & 2 deletions mcp_databend/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,26 @@
import sys
import logging
from .server import mcp, logger
from .env import get_config, TransportType


def main():
"""Main entry point for the MCP server."""
try:
logger.info("Starting Databend MCP Server...")
mcp.run()
config = get_config()
transport = config.mcp_server_transport

logger.info(f"Starting Databend MCP Server with transport: {transport}")

# For HTTP and SSE transports, we need to specify host and port
http_transports = [TransportType.HTTP.value, TransportType.SSE.value]
if transport in http_transports:
# Use the configured bind host (defaults to 127.0.0.1, can be set to 0.0.0.0)
# and bind port (defaults to 8001)
mcp.run(transport=transport, host=config.mcp_bind_host, port=config.mcp_bind_port)
else:
# For stdio transport, no host or port is needed
mcp.run(transport=transport)
except KeyboardInterrupt:
logger.info("Shutting down server by user request")
except Exception as e:
Expand Down
81 changes: 54 additions & 27 deletions mcp_databend/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
import logging
import sys
import re
from mcp.server.fastmcp import FastMCP
from fastmcp import FastMCP
from fastmcp.tools import Tool
import concurrent.futures
from dotenv import load_dotenv
import pyarrow as pa
import atexit
from typing import Optional
from .env import get_config
from .env import get_config, TransportType

# Constants
SERVER_NAME = "mcp-databend"
Expand Down Expand Up @@ -165,7 +166,11 @@ def _execute_sql(sql: str) -> dict:
logger.warning(error_msg)
return {"status": "error", "message": error_msg}

return result
# Ensure we always return a dict structure for fastmcp compatibility
if isinstance(result, list):
return {"status": "success", "data": result, "row_count": len(result)}
else:
return {"status": "success", "data": result}

except concurrent.futures.TimeoutError:
error_msg = f"Query timed out after {DEFAULT_TIMEOUT} seconds"
Expand All @@ -179,8 +184,7 @@ def _execute_sql(sql: str) -> dict:
return {"status": "error", "message": error_msg}


@mcp.tool()
async def execute_sql(sql: str) -> dict:
def execute_sql(sql: str) -> dict:
"""
Execute SQL query against Databend database with MCP safe mode protection.

Expand All @@ -196,15 +200,13 @@ async def execute_sql(sql: str) -> dict:
return _execute_sql(sql)


@mcp.tool()
async def show_databases():
def show_databases():
"""List available Databend databases (safe operation, not affected by MCP safe mode)"""
logger.info("Listing all databases")
return _execute_sql("SHOW DATABASES")


@mcp.tool()
async def show_tables(database: Optional[str] = None, filter: Optional[str] = None):
def show_tables(database: Optional[str] = None, filter: Optional[str] = None):
"""
List available Databend tables in a database (safe operation, not affected by MCP safe mode)
Args:
Expand All @@ -222,9 +224,20 @@ async def show_tables(database: Optional[str] = None, filter: Optional[str] = No
sql += f" WHERE {filter}"
return _execute_sql(sql)

def show_functions(filter: Optional[str] = None):
"""List available Databend functions (safe operation, not affected by MCP safe mode)
Args:
filter: The filter string, eg: "name like 'add%'"
Returns:
Dictionary containing either query results or error information
"""
logger.info("Listing all functions")
sql = "SHOW FUNCTIONS"
if filter is not None:
sql += f" WHERE {filter}"
return _execute_sql(sql)

@mcp.tool()
async def describe_table(table: str, database: Optional[str] = None):
def describe_table(table: str, database: Optional[str] = None):
"""
Describe a Databend table (safe operation, not affected by MCP safe mode)
Args:
Expand All @@ -242,15 +255,13 @@ async def describe_table(table: str, database: Optional[str] = None):
return _execute_sql(sql)


@mcp.tool()
def show_stages():
"""List available Databend stages (safe operation, not affected by MCP safe mode)"""
logger.info("Listing all stages")
return _execute_sql("SHOW STAGES")


@mcp.tool()
async def list_stage_files(stage_name: str, path: Optional[str] = None):
def list_stage_files(stage_name: str, path: Optional[str] = None):
"""
List files in a Databend stage (safe operation, not affected by MCP safe mode)
Args:
Expand All @@ -273,24 +284,21 @@ async def list_stage_files(stage_name: str, path: Optional[str] = None):
return _execute_sql(sql)


@mcp.tool()
async def show_connections():
def show_connections():
"""List available Databend connections (safe operation, not affected by MCP safe mode)"""
logger.info("Listing all connections")
return _execute_sql("SHOW CONNECTIONS")


@mcp.tool()
async def create_stage(
name: str, url: str, connection_name: Optional[str] = None, **kwargs
def create_stage(
name: str, url: str, connection_name: Optional[str] = None
) -> dict:
"""
Create a Databend stage with connection
Args:
name: The stage name
url: The stage URL (e.g., 's3://bucket-name')
connection_name: Optional connection name to use
**kwargs: Additional stage options

Returns:
Dictionary containing either query results or error information
Expand All @@ -302,20 +310,39 @@ async def create_stage(
if connection_name:
sql_parts.append(f"CONNECTION = (CONNECTION_NAME = '{connection_name}')")

# Add any additional options from kwargs
for key, value in kwargs.items():
if key not in ["name", "url", "connection_name"]:
sql_parts.append(f"{key.upper()} = '{value}'")

sql = " ".join(sql_parts)
return _execute_sql(sql)


# Register all tools
mcp.add_tool(Tool.from_function(execute_sql))
mcp.add_tool(Tool.from_function(show_databases))
mcp.add_tool(Tool.from_function(show_tables))
mcp.add_tool(Tool.from_function(show_functions))
mcp.add_tool(Tool.from_function(describe_table))
mcp.add_tool(Tool.from_function(show_stages))
mcp.add_tool(Tool.from_function(list_stage_files))
mcp.add_tool(Tool.from_function(show_connections))
mcp.add_tool(Tool.from_function(create_stage))


def main():
"""Main entry point for the MCP server."""
try:
logger.info("Starting Databend MCP Server...")
mcp.run()
config = get_config()
transport = config.mcp_server_transport

logger.info(f"Starting Databend MCP Server with transport: {transport}")

# For HTTP and SSE transports, we need to specify host and port
http_transports = [TransportType.HTTP.value, TransportType.SSE.value]
if transport in http_transports:
# Use the configured bind host (defaults to 127.0.0.1, can be set to 0.0.0.0)
# and bind port (defaults to 8001)
mcp.run(transport=transport, host=config.mcp_bind_host, port=config.mcp_bind_port)
else:
# For stdio transport, no host or port is needed
mcp.run(transport=transport)
except KeyboardInterrupt:
logger.info("Shutting down server by user request")
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ classifiers = [
dependencies = [
"databend>=1.2.810",
"databend-driver>=0.27.3",
"mcp>=1.9.0",
"fastmcp>=2.12.3",
"pyarrow>=21.0.0",
"python-dotenv>=1.1.0",
]
Expand Down
Loading