From 7c6e96e47eed765cfe8971d4e4598e29d0782f39 Mon Sep 17 00:00:00 2001 From: Joe S Date: Thu, 23 Oct 2025 12:24:24 -0700 Subject: [PATCH 1/4] implement a write mode --- README.md | 37 ++++- mcp_clickhouse/__init__.py | 4 +- mcp_clickhouse/mcp_env.py | 21 +++ mcp_clickhouse/mcp_server.py | 149 +++++++++++++---- tests/test_mcp_server.py | 12 +- tests/test_tool.py | 314 ++++++++++++++++++++++++++++++++++- 6 files changed, 491 insertions(+), 46 deletions(-) diff --git a/README.md b/README.md index 3db4b9e..18eea8b 100644 --- a/README.md +++ b/README.md @@ -10,10 +10,10 @@ An MCP server for ClickHouse. ### ClickHouse Tools -* `run_select_query` +* `run_query` * Execute SQL queries on your ClickHouse cluster. - * Input: `sql` (string): The SQL query to execute. - * All ClickHouse queries are run with `readonly = 1` to ensure they are safe. + * Input: `query` (string): The SQL query to execute. + * Queries run in read-only mode by default (`CLICKHOUSE_READ_ONLY=true`), but writes can be enabled explicitly if needed. * `list_databases` * List all databases on your ClickHouse cluster. @@ -26,7 +26,7 @@ An MCP server for ClickHouse. * `run_chdb_select_query` * Execute SQL queries using [chDB](https://github.com/chdb-io/chdb)'s embedded ClickHouse engine. - * Input: `sql` (string): The SQL query to execute. + * Input: `query` (string): The SQL query to execute. * Query data directly from various sources (files, URLs, databases) without ETL processes. ### Health Check Endpoint @@ -172,6 +172,26 @@ You can also enable both ClickHouse and chDB simultaneously: 4. Restart Claude Desktop to apply the changes. +### Optional Write Access + +By default, this MCP enforces read-only queries so that accidental mutations cannot happen during exploration. To allow DDL or INSERT/UPDATE statements, set the `CLICKHOUSE_READ_ONLY` environment variable to `false`. The server keeps enforcing read-only mode if the ClickHouse instance itself disallows writes. + +### DROP Operation Protection + +Even when write access is enabled (`CLICKHOUSE_READ_ONLY=false`), DROP operations (DROP TABLE, DROP DATABASE, DROP VIEW, DROP DICTIONARY) require an additional opt-in flag for safety. This prevents accidental data deletion during AI exploration. + +To enable DROP operations, set both flags: +```json +"env": { + "CLICKHOUSE_READ_ONLY": "false", + "CLICKHOUSE_ALLOW_DROP": "true" +} +``` + +This two-tier approach ensures that accidental drops are very difficult: +- **Write operations** (INSERT, UPDATE, CREATE) require `CLICKHOUSE_READ_ONLY=false` +- **Destructive operations** (DROP) additionally require `CLICKHOUSE_ALLOW_DROP=true` + ### Running Without uv (Using System Python) If you prefer to use the system Python installation instead of uv, you can install the package from PyPI and run it directly: @@ -321,6 +341,15 @@ The following environment variables are used to configure the ClickHouse and chD * `CLICKHOUSE_ENABLED`: Enable/disable ClickHouse functionality * Default: `"true"` * Set to `"false"` to disable ClickHouse tools when using chDB only +* `CLICKHOUSE_READ_ONLY`: Force read-only query mode + * Default: `"true"` + * Set to `"false"` to allow DDL (CREATE, ALTER, DROP) and DML (INSERT, UPDATE, DELETE) operations + * When enabled, queries run with `readonly=1` setting to prevent data modifications +* `CLICKHOUSE_ALLOW_DROP`: Allow DROP operations (DROP TABLE, DROP DATABASE, DROP VIEW, DROP DICTIONARY) + * Default: `"false"` + * Only takes effect when `CLICKHOUSE_READ_ONLY=false` + * Set to `"true"` to explicitly allow destructive DROP operations + * This is a safety feature to prevent accidental data deletion during AI exploration #### chDB Variables diff --git a/mcp_clickhouse/__init__.py b/mcp_clickhouse/__init__.py index c442d9c..282af0d 100644 --- a/mcp_clickhouse/__init__.py +++ b/mcp_clickhouse/__init__.py @@ -4,7 +4,7 @@ create_clickhouse_client, list_databases, list_tables, - run_select_query, + run_query, create_chdb_client, run_chdb_select_query, chdb_initial_prompt, @@ -21,7 +21,7 @@ __all__ = [ "list_databases", "list_tables", - "run_select_query", + "run_query", "create_clickhouse_client", "create_chdb_client", "run_chdb_select_query", diff --git a/mcp_clickhouse/mcp_env.py b/mcp_clickhouse/mcp_env.py index 64e0e8a..05023ff 100644 --- a/mcp_clickhouse/mcp_env.py +++ b/mcp_clickhouse/mcp_env.py @@ -44,6 +44,8 @@ class ClickHouseConfig: CLICKHOUSE_DATABASE: Default database to use (default: None) CLICKHOUSE_PROXY_PATH: Path to be added to the host URL. For instance, for servers behind an HTTP proxy (default: None) CLICKHOUSE_ENABLED: Enable ClickHouse server (default: true) + CLICKHOUSE_READ_ONLY: Force read-only queries (default: true) + CLICKHOUSE_ALLOW_DROP: Allow DROP operations when writes are enabled (default: false) """ def __init__(self): @@ -126,6 +128,25 @@ def send_receive_timeout(self) -> int: def proxy_path(self) -> str: return os.getenv("CLICKHOUSE_PROXY_PATH") + @property + def read_only(self) -> bool: + """Get whether queries should be forced to read-only mode. + + Default: True + """ + return os.getenv("CLICKHOUSE_READ_ONLY", "true").lower() == "true" + + @property + def allow_drop(self) -> bool: + """Get whether DROP operations (DROP TABLE, DROP DATABASE) are allowed. + + This setting provides an additional safety layer when write access is enabled. + Even with CLICKHOUSE_READ_ONLY=false, DROP operations require this flag. + + Default: False + """ + return os.getenv("CLICKHOUSE_ALLOW_DROP", "false").lower() == "true" + def get_client_config(self) -> dict: """Get the configuration dictionary for clickhouse_connect client. diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index 01639b2..04567d4 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -4,6 +4,7 @@ import concurrent.futures import atexit import os +import re import clickhouse_connect import chdb.session as chs @@ -167,11 +168,42 @@ def list_tables(database: str, like: Optional[str] = None, not_like: Optional[st return [asdict(table) for table in tables] +def _validate_query_for_drop(query: str) -> None: + """Validate that DROP operations are allowed. + + Args: + query: The SQL query to validate + + Raises: + ToolError: If the query contains DROP operations but CLICKHOUSE_ALLOW_DROP is not set + """ + config = get_config() + + if config.read_only: + return + + if config.allow_drop: + return + + # Simple pattern matching for DROP operations + query_upper = query.upper() + + drop_pattern = r'\bDROP\s+(TABLE|DATABASE|VIEW|DICTIONARY)\b' + if re.search(drop_pattern, query_upper): + raise ToolError( + "DROP operations are not allowed. " + "Set CLICKHOUSE_ALLOW_DROP=true to enable DROP TABLE/DATABASE operations. " + "This is a safety feature to prevent accidental data deletion." + ) + + def execute_query(query: str): client = create_clickhouse_client() try: - read_only = get_readonly_setting(client) - res = client.query(query, settings={"readonly": read_only}) + _validate_query_for_drop(query) + + query_settings = build_query_settings(client) + res = client.query(query, settings=query_settings) logger.info(f"Query returned {len(res.result_rows)} rows") return {"columns": res.column_names, "rows": res.result_rows} except Exception as err: @@ -179,9 +211,13 @@ def execute_query(query: str): raise ToolError(f"Query execution failed: {str(err)}") -def run_select_query(query: str): - """Run a SELECT query in a ClickHouse database""" - logger.info(f"Executing SELECT query: {query}") +def run_query(query: str): + """Execute a SQL query against ClickHouse. + + Queries run in read-only mode by default (readonly=1). Set CLICKHOUSE_READ_ONLY=false + to allow DDL and DML statements when your ClickHouse server permits them. + """ + logger.info(f"Executing query: {query}") try: future = QUERY_EXECUTOR.submit(execute_query, query) try: @@ -204,10 +240,26 @@ def run_select_query(query: str): except ToolError: raise except Exception as e: - logger.error(f"Unexpected error in run_select_query: {str(e)}") + logger.error("Unexpected error in run_query: %s", str(e)) raise RuntimeError(f"Unexpected error during query execution: {str(e)}") +def _get_query_tool_description() -> str: + config = get_config() + if config.read_only: + return ( + "Execute SQL queries in ClickHouse. Queries run in read-only mode by default. " + "Set CLICKHOUSE_READ_ONLY=false to allow DDL and DML when appropriate." + ) + + drop_status = "DROP operations are allowed" if config.allow_drop else "DROP operations are blocked (set CLICKHOUSE_ALLOW_DROP=true to enable)" + return ( + f"Execute SQL queries in ClickHouse with writes enabled. " + f"CLICKHOUSE_READ_ONLY=false so DDL and DML statements are permitted. " + f"{drop_status}." + ) + + def create_clickhouse_client(): client_config = get_config().get_client_config() logger.info( @@ -229,34 +281,75 @@ def create_clickhouse_client(): raise -def get_readonly_setting(client) -> str: - """Get the appropriate readonly setting value to use for queries. +def build_query_settings(client) -> dict[str, str]: + """Build query settings dict for ClickHouse queries. + + Always returns a dict (possibly empty) to ensure consistent behavior. + """ + readonly_setting = get_readonly_setting(client) + if readonly_setting is not None: + return {"readonly": readonly_setting} + return {} - This function handles potential conflicts between server and client readonly settings: - - readonly=0: No read-only restrictions - - readonly=1: Only read queries allowed, settings cannot be changed - - readonly=2: Only read queries allowed, settings can be changed (except readonly itself) - If server has readonly=2 and client tries to set readonly=1, it would cause: - "Setting readonly is unknown or readonly" error +def get_readonly_setting(client) -> Optional[str]: + """Determine the readonly setting value for queries. - This function preserves the server's readonly setting unless it's 0, in which case - we enforce readonly=1 to ensure queries are read-only. + This implements the following logic: + 1. If CLICKHOUSE_READ_ONLY=false (writes enabled): + - Allow writes if server permits (server readonly=None or "0") + - Fall back to server's readonly setting if server enforces it + - Log a warning when falling back - Args: - client: ClickHouse client connection + 2. If CLICKHOUSE_READ_ONLY=true (default, read-only mode): + - Enforce readonly=1 if server allows writes + - Respect server's readonly setting if server enforces stricter mode Returns: - String value of readonly setting to use + "0" = writes allowed + "1" = read-only mode (allows SET of non-privileged settings) + "2" = strict read-only (server enforced; disallows SET) + None = use server default (shouldn't happen in practice) """ - read_only = client.server_settings.get("readonly") - if read_only: - if read_only == "0": - return "1" # Force read-only mode if server has it disabled - else: - return read_only.value # Respect server's readonly setting (likely 2) - else: - return "1" # Default to basic read-only mode if setting isn't present + config = get_config() + server_settings = getattr(client, "server_settings", {}) or {} + server_readonly = _normalize_readonly_value(server_settings.get("readonly")) + + # Case 1: User wants write access (CLICKHOUSE_READ_ONLY=false) + if not config.read_only: + if server_readonly in (None, "0"): + logger.info("Write mode enabled (CLICKHOUSE_READ_ONLY=false)") + return "0" + + # If server forbids writes, respect server configuration + logger.warning( + "CLICKHOUSE_READ_ONLY=false but server enforces readonly=%s; " + "write operations will fail", + server_readonly, + ) + return server_readonly + + # Case 2: User wants read-only mode (CLICKHOUSE_READ_ONLY=true, default) + if server_readonly in (None, "0"): + return "1" # Enforce read-only since server allows writes + + return server_readonly # Server already enforces readonly, respect it + + +def _normalize_readonly_value(value: Any) -> Optional[str]: + """Normalize ClickHouse readonly setting to a simple string. + + The clickhouse_connect library represents settings as objects with a .value attribute. + This function extracts the actual value for our logic. + """ + if value is None: + return None + + # Extract value from clickhouse_connect setting object + if hasattr(value, "value"): + value = value.value + + return str(value) def create_chdb_client(): @@ -346,7 +439,7 @@ def _init_chdb_client(): if os.getenv("CLICKHOUSE_ENABLED", "true").lower() == "true": mcp.add_tool(Tool.from_function(list_databases)) mcp.add_tool(Tool.from_function(list_tables)) - mcp.add_tool(Tool.from_function(run_select_query)) + mcp.add_tool(Tool.from_function(run_query, description=_get_query_tool_description())) logger.info("ClickHouse tools registered") diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 28c4420..59bd747 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -189,7 +189,7 @@ async def test_run_select_query_success(mcp_server, setup_test_database): async with Client(mcp_server) as client: query = f"SELECT id, name, age FROM {test_db}.{test_table} ORDER BY id" - result = await client.call_tool("run_select_query", {"query": query}) + result = await client.call_tool("run_query", {"query": query}) query_result = json.loads(result.content[0].text) @@ -215,7 +215,7 @@ async def test_run_select_query_with_aggregation(mcp_server, setup_test_database async with Client(mcp_server) as client: query = f"SELECT COUNT(*) as count, AVG(age) as avg_age FROM {test_db}.{test_table}" - result = await client.call_tool("run_select_query", {"query": query}) + result = await client.call_tool("run_query", {"query": query}) query_result = json.loads(result.content[0].text) @@ -243,7 +243,7 @@ async def test_run_select_query_with_join(mcp_server, setup_test_database): COUNT(DISTINCT event_type) as event_types_count FROM {test_db}.{test_table2} """ - result = await client.call_tool("run_select_query", {"query": query}) + result = await client.call_tool("run_query", {"query": query}) query_result = json.loads(result.content[0].text) assert query_result["rows"][0][0] == 3 # login, logout, purchase @@ -260,7 +260,7 @@ async def test_run_select_query_error(mcp_server, setup_test_database): # Should raise ToolError with pytest.raises(ToolError) as exc_info: - await client.call_tool("run_select_query", {"query": query}) + await client.call_tool("run_query", {"query": query}) assert "Query execution failed" in str(exc_info.value) @@ -274,7 +274,7 @@ async def test_run_select_query_syntax_error(mcp_server): # Should raise ToolError with pytest.raises(ToolError) as exc_info: - await client.call_tool("run_select_query", {"query": query}) + await client.call_tool("run_query", {"query": query}) assert "Query execution failed" in str(exc_info.value) @@ -352,7 +352,7 @@ async def test_concurrent_queries(mcp_server, setup_test_database): # Execute all queries concurrently results = await asyncio.gather( - *[client.call_tool("run_select_query", {"query": query}) for query in queries] + *[client.call_tool("run_query", {"query": query}) for query in queries] ) # Verify all queries succeeded diff --git a/tests/test_tool.py b/tests/test_tool.py index 50878c4..c8c0cc5 100644 --- a/tests/test_tool.py +++ b/tests/test_tool.py @@ -1,10 +1,12 @@ -import unittest import json +import os +import unittest +from unittest.mock import patch from dotenv import load_dotenv from fastmcp.exceptions import ToolError -from mcp_clickhouse import create_clickhouse_client, list_databases, list_tables, run_select_query +from mcp_clickhouse import create_clickhouse_client, list_databases, list_tables, run_query load_dotenv() @@ -62,22 +64,22 @@ def test_list_tables_with_like(self): self.assertEqual(len(result), 1) self.assertEqual(result[0]["name"], self.test_table) - def test_run_select_query_success(self): + def test_run_query_success(self): """Test running a SELECT query successfully.""" query = f"SELECT * FROM {self.test_db}.{self.test_table}" - result = run_select_query(query) + result = run_query(query) self.assertIsInstance(result, dict) self.assertEqual(len(result["rows"]), 2) self.assertEqual(result["rows"][0][0], 1) self.assertEqual(result["rows"][0][1], "Alice") - def test_run_select_query_failure(self): + def test_run_query_failure(self): """Test running a SELECT query with an error.""" query = f"SELECT * FROM {self.test_db}.non_existent_table" # Should raise ToolError with self.assertRaises(ToolError) as context: - run_select_query(query) + run_query(query) self.assertIn("Query execution failed", str(context.exception)) @@ -99,5 +101,305 @@ def test_table_and_column_comments(self): self.assertEqual(columns["name"]["comment"], "User name field") +@patch.dict(os.environ, {"CLICKHOUSE_READ_ONLY": "false", "CLICKHOUSE_ALLOW_DROP": "true"}) +class TestClickhouseWriteMode(unittest.TestCase): + """Tests for write mode functionality (CLICKHOUSE_READ_ONLY=false). + + Note: These tests use @patch.dict to temporarily set CLICKHOUSE_READ_ONLY=false + and CLICKHOUSE_ALLOW_DROP=true without affecting other tests. This allows testing + write operations in isolation. + """ + + @classmethod + def setUpClass(cls): + """Set up the environment before tests.""" + cls.client = create_clickhouse_client() + cls.test_db = "test_write_mode_db" + cls.test_table = "write_test_table" + + cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") + cls.client.command(f"CREATE DATABASE {cls.test_db}") + + @classmethod + def tearDownClass(cls): + """Clean up the environment after tests.""" + cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") + + def test_insert_query(self): + """Test that INSERT queries work when writes are enabled.""" + create_query = f""" + CREATE TABLE {self.test_db}.{self.test_table} ( + id UInt32, + value String + ) ENGINE = MergeTree() + ORDER BY id + """ + result = run_query(create_query) + self.assertIsInstance(result, dict) + + insert_query = f""" + INSERT INTO {self.test_db}.{self.test_table} (id, value) + VALUES (1, 'test_value') + """ + result = run_query(insert_query) + self.assertIsInstance(result, dict) + + select_query = f"SELECT * FROM {self.test_db}.{self.test_table}" + result = run_query(select_query) + self.assertEqual(len(result["rows"]), 1) + self.assertEqual(result["rows"][0][0], 1) + self.assertEqual(result["rows"][0][1], "test_value") + + self.client.command(f"DROP TABLE {self.test_db}.{self.test_table}") + + def test_create_table_query(self): + """Test that CREATE TABLE queries work when writes are enabled.""" + create_query = f""" + CREATE TABLE {self.test_db}.ddl_test ( + id UInt32, + name String + ) ENGINE = MergeTree() + ORDER BY id + """ + result = run_query(create_query) + self.assertIsInstance(result, dict) + + tables = list_tables(self.test_db) + table_names = [t["name"] for t in tables] + self.assertIn("ddl_test", table_names) + + self.client.command(f"DROP TABLE {self.test_db}.ddl_test") + + def test_alter_table_query(self): + """Test that ALTER TABLE queries work when writes are enabled.""" + self.client.command(f""" + CREATE TABLE {self.test_db}.alter_test ( + id UInt32 + ) ENGINE = MergeTree() + ORDER BY id + """) + + alter_query = f""" + ALTER TABLE {self.test_db}.alter_test + ADD COLUMN name String + """ + result = run_query(alter_query) + self.assertIsInstance(result, dict) + + tables = list_tables(self.test_db, like="alter_test") + self.assertEqual(len(tables), 1) + column_names = [col["name"] for col in tables[0]["columns"]] + self.assertIn("name", column_names) + + self.client.command(f"DROP TABLE {self.test_db}.alter_test") + + +@patch.dict(os.environ, {"CLICKHOUSE_READ_ONLY": "false", "CLICKHOUSE_ALLOW_DROP": "true"}) +class TestClickhouseDropProtection(unittest.TestCase): + """Tests for DROP operation protection. + + These tests verify that DROP operations (DROP TABLE, DROP DATABASE) are + properly controlled by the CLICKHOUSE_ALLOW_DROP flag when writes are enabled. + """ + + @classmethod + def setUpClass(cls): + """Set up the environment before tests.""" + cls.client = create_clickhouse_client() + cls.test_db = "test_drop_protection_db" + cls.test_table = "drop_test_table" + + # Use direct client commands for setup (bypassing run_query) + cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") + cls.client.command(f"CREATE DATABASE {cls.test_db}") + cls.client.command(f""" + CREATE TABLE {cls.test_db}.{cls.test_table} ( + id UInt32, + value String + ) ENGINE = MergeTree() + ORDER BY id + """) + + @classmethod + def tearDownClass(cls): + """Clean up the environment after tests.""" + cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") + + @patch.dict(os.environ, {"CLICKHOUSE_READ_ONLY": "false", "CLICKHOUSE_ALLOW_DROP": "false"}) + def test_drop_table_blocked_when_flag_not_set(self): + """Test that DROP TABLE is blocked when CLICKHOUSE_ALLOW_DROP=false.""" + drop_query = f"DROP TABLE {self.test_db}.{self.test_table}" + + # Should raise ToolError due to DROP protection + with self.assertRaises(ToolError) as context: + run_query(drop_query) + + error_msg = str(context.exception) + self.assertIn("DROP operations are not allowed", error_msg) + self.assertIn("CLICKHOUSE_ALLOW_DROP=true", error_msg) + + @patch.dict(os.environ, {"CLICKHOUSE_READ_ONLY": "false", "CLICKHOUSE_ALLOW_DROP": "false"}) + def test_drop_database_blocked_when_flag_not_set(self): + """Test that DROP DATABASE is blocked when CLICKHOUSE_ALLOW_DROP=false.""" + temp_db = "test_temp_drop_db" + self.client.command(f"CREATE DATABASE IF NOT EXISTS {temp_db}") + + drop_query = f"DROP DATABASE {temp_db}" + + # Should raise ToolError due to DROP protection + with self.assertRaises(ToolError) as context: + run_query(drop_query) + + error_msg = str(context.exception) + self.assertIn("DROP operations are not allowed", error_msg) + self.assertIn("CLICKHOUSE_ALLOW_DROP=true", error_msg) + + self.client.command(f"DROP DATABASE IF EXISTS {temp_db}") + + def test_drop_allowed_when_flag_set(self): + """Test that DROP works when CLICKHOUSE_ALLOW_DROP=true.""" + # This test runs with ALLOW_DROP=true from the class decorator + temp_table = "temp_drop_table" + self.client.command(f""" + CREATE TABLE {self.test_db}.{temp_table} ( + id UInt32 + ) ENGINE = MergeTree() + ORDER BY id + """) + + # Should succeed + drop_query = f"DROP TABLE {self.test_db}.{temp_table}" + result = run_query(drop_query) + self.assertIsInstance(result, dict) + + @patch.dict(os.environ, {"CLICKHOUSE_READ_ONLY": "false", "CLICKHOUSE_ALLOW_DROP": "false"}) + def test_insert_allowed_without_drop_flag(self): + """Test that INSERT works even when CLICKHOUSE_ALLOW_DROP=false.""" + insert_query = f""" + INSERT INTO {self.test_db}.{self.test_table} (id, value) + VALUES (1, 'test_value') + """ + result = run_query(insert_query) + self.assertIsInstance(result, dict) + + select_query = f"SELECT * FROM {self.test_db}.{self.test_table}" + result = run_query(select_query) + self.assertGreaterEqual(len(result["rows"]), 1) + + @patch.dict(os.environ, {"CLICKHOUSE_READ_ONLY": "false", "CLICKHOUSE_ALLOW_DROP": "false"}) + def test_create_allowed_without_drop_flag(self): + """Test that CREATE TABLE works even when CLICKHOUSE_ALLOW_DROP=false.""" + create_query = f""" + CREATE TABLE {self.test_db}.create_test ( + id UInt32 + ) ENGINE = MergeTree() + ORDER BY id + """ + result = run_query(create_query) + self.assertIsInstance(result, dict) + + tables = list_tables(self.test_db) + table_names = [t["name"] for t in tables] + self.assertIn("create_test", table_names) + + self.client.command(f"DROP TABLE {self.test_db}.create_test") + + +class TestClickhouseReadOnlyMode(unittest.TestCase): + """Tests for read-only mode functionality (CLICKHOUSE_READ_ONLY=true, default). + + These tests verify that write operations are properly blocked when + CLICKHOUSE_READ_ONLY is true (the default setting). + """ + + @classmethod + def setUpClass(cls): + """Set up the environment before tests.""" + cls.env_patcher = patch.dict(os.environ, {"CLICKHOUSE_READ_ONLY": "true"}) + cls.env_patcher.start() + + cls.client = create_clickhouse_client() + cls.test_db = "test_readonly_db" + cls.test_table = "readonly_test_table" + + cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") + cls.client.command(f"CREATE DATABASE {cls.test_db}") + cls.client.command(f""" + CREATE TABLE {cls.test_db}.{cls.test_table} ( + id UInt32, + value String + ) ENGINE = MergeTree() + ORDER BY id + """) + + @classmethod + def tearDownClass(cls): + """Clean up the environment after tests.""" + cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") + cls.env_patcher.stop() + + def test_insert_blocked_in_readonly_mode(self): + """Test that INSERT queries fail when CLICKHOUSE_READ_ONLY=true.""" + insert_query = f""" + INSERT INTO {self.test_db}.{self.test_table} (id, value) + VALUES (1, 'should_fail') + """ + + with self.assertRaises(ToolError) as context: + run_query(insert_query) + + error_msg = str(context.exception) + self.assertIn("Query execution failed", error_msg) + self.assertTrue( + "readonly" in error_msg.lower() or "cannot execute" in error_msg.lower(), + f"Expected readonly-related error, got: {error_msg}", + ) + + def test_create_table_blocked_in_readonly_mode(self): + """Test that CREATE TABLE queries fail when CLICKHOUSE_READ_ONLY=true.""" + create_query = f""" + CREATE TABLE {self.test_db}.should_not_exist ( + id UInt32 + ) ENGINE = MergeTree() + ORDER BY id + """ + + with self.assertRaises(ToolError) as context: + run_query(create_query) + + error_msg = str(context.exception) + self.assertIn("Query execution failed", error_msg) + self.assertTrue( + "readonly" in error_msg.lower() or "cannot execute" in error_msg.lower(), + f"Expected readonly-related error, got: {error_msg}", + ) + + def test_alter_table_blocked_in_readonly_mode(self): + """Test that ALTER TABLE queries fail when CLICKHOUSE_READ_ONLY=true.""" + alter_query = f""" + ALTER TABLE {self.test_db}.{self.test_table} + ADD COLUMN new_column String + """ + + with self.assertRaises(ToolError) as context: + run_query(alter_query) + + error_msg = str(context.exception) + self.assertIn("Query execution failed", error_msg) + self.assertTrue( + "readonly" in error_msg.lower() or "cannot execute" in error_msg.lower(), + f"Expected readonly-related error, got: {error_msg}", + ) + + def test_select_allowed_in_readonly_mode(self): + """Test that SELECT queries work normally in read-only mode.""" + select_query = f"SELECT * FROM {self.test_db}.{self.test_table}" + result = run_query(select_query) + + self.assertIsInstance(result, dict) + self.assertIn("columns", result) + self.assertIn("rows", result) + + if __name__ == "__main__": unittest.main() From 51787e3ba425e922540212f6e7ac754015759536 Mon Sep 17 00:00:00 2001 From: Joe S Date: Thu, 23 Oct 2025 13:04:25 -0700 Subject: [PATCH 2/4] change write access env var name --- README.md | 20 ++++++++++---------- mcp_clickhouse/mcp_env.py | 14 +++++++------- mcp_clickhouse/mcp_server.py | 26 ++++++++++++++------------ tests/test_tool.py | 28 ++++++++++++++-------------- 4 files changed, 45 insertions(+), 43 deletions(-) diff --git a/README.md b/README.md index 18eea8b..5e33f49 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ An MCP server for ClickHouse. * `run_query` * Execute SQL queries on your ClickHouse cluster. * Input: `query` (string): The SQL query to execute. - * Queries run in read-only mode by default (`CLICKHOUSE_READ_ONLY=true`), but writes can be enabled explicitly if needed. + * Queries run in read-only mode by default (`CLICKHOUSE_ALLOW_WRITE_ACCESS=false`), but writes can be enabled explicitly if needed. * `list_databases` * List all databases on your ClickHouse cluster. @@ -174,22 +174,22 @@ You can also enable both ClickHouse and chDB simultaneously: ### Optional Write Access -By default, this MCP enforces read-only queries so that accidental mutations cannot happen during exploration. To allow DDL or INSERT/UPDATE statements, set the `CLICKHOUSE_READ_ONLY` environment variable to `false`. The server keeps enforcing read-only mode if the ClickHouse instance itself disallows writes. +By default, this MCP enforces read-only queries so that accidental mutations cannot happen during exploration. To allow DDL or INSERT/UPDATE statements, set the `CLICKHOUSE_ALLOW_WRITE_ACCESS` environment variable to `true`. The server keeps enforcing read-only mode if the ClickHouse instance itself disallows writes. ### DROP Operation Protection -Even when write access is enabled (`CLICKHOUSE_READ_ONLY=false`), DROP operations (DROP TABLE, DROP DATABASE, DROP VIEW, DROP DICTIONARY) require an additional opt-in flag for safety. This prevents accidental data deletion during AI exploration. +Even when write access is enabled (`CLICKHOUSE_ALLOW_WRITE_ACCESS=true`), DROP operations (DROP TABLE, DROP DATABASE, DROP VIEW, DROP DICTIONARY) require an additional opt-in flag for safety. This prevents accidental data deletion during AI exploration. To enable DROP operations, set both flags: ```json "env": { - "CLICKHOUSE_READ_ONLY": "false", + "CLICKHOUSE_ALLOW_WRITE_ACCESS": "true", "CLICKHOUSE_ALLOW_DROP": "true" } ``` This two-tier approach ensures that accidental drops are very difficult: -- **Write operations** (INSERT, UPDATE, CREATE) require `CLICKHOUSE_READ_ONLY=false` +- **Write operations** (INSERT, UPDATE, CREATE) require `CLICKHOUSE_ALLOW_WRITE_ACCESS=true` - **Destructive operations** (DROP) additionally require `CLICKHOUSE_ALLOW_DROP=true` ### Running Without uv (Using System Python) @@ -341,13 +341,13 @@ The following environment variables are used to configure the ClickHouse and chD * `CLICKHOUSE_ENABLED`: Enable/disable ClickHouse functionality * Default: `"true"` * Set to `"false"` to disable ClickHouse tools when using chDB only -* `CLICKHOUSE_READ_ONLY`: Force read-only query mode - * Default: `"true"` - * Set to `"false"` to allow DDL (CREATE, ALTER, DROP) and DML (INSERT, UPDATE, DELETE) operations - * When enabled, queries run with `readonly=1` setting to prevent data modifications +* `CLICKHOUSE_ALLOW_WRITE_ACCESS`: Allow write operations (DDL and DML) + * Default: `"false"` + * Set to `"true"` to allow DDL (CREATE, ALTER, DROP) and DML (INSERT, UPDATE, DELETE) operations + * When disabled (default), queries run with `readonly=1` setting to prevent data modifications * `CLICKHOUSE_ALLOW_DROP`: Allow DROP operations (DROP TABLE, DROP DATABASE, DROP VIEW, DROP DICTIONARY) * Default: `"false"` - * Only takes effect when `CLICKHOUSE_READ_ONLY=false` + * Only takes effect when `CLICKHOUSE_ALLOW_WRITE_ACCESS=true` is also set * Set to `"true"` to explicitly allow destructive DROP operations * This is a safety feature to prevent accidental data deletion during AI exploration diff --git a/mcp_clickhouse/mcp_env.py b/mcp_clickhouse/mcp_env.py index 05023ff..976961e 100644 --- a/mcp_clickhouse/mcp_env.py +++ b/mcp_clickhouse/mcp_env.py @@ -44,8 +44,8 @@ class ClickHouseConfig: CLICKHOUSE_DATABASE: Default database to use (default: None) CLICKHOUSE_PROXY_PATH: Path to be added to the host URL. For instance, for servers behind an HTTP proxy (default: None) CLICKHOUSE_ENABLED: Enable ClickHouse server (default: true) - CLICKHOUSE_READ_ONLY: Force read-only queries (default: true) - CLICKHOUSE_ALLOW_DROP: Allow DROP operations when writes are enabled (default: false) + CLICKHOUSE_ALLOW_WRITE_ACCESS: Allow write operations (DDL and DML) (default: false) + CLICKHOUSE_ALLOW_DROP: Allow DROP operations when writes are also enabled (default: false) """ def __init__(self): @@ -129,19 +129,19 @@ def proxy_path(self) -> str: return os.getenv("CLICKHOUSE_PROXY_PATH") @property - def read_only(self) -> bool: - """Get whether queries should be forced to read-only mode. + def allow_write_access(self) -> bool: + """Get whether write operations (DDL and DML) are allowed. - Default: True + Default: False """ - return os.getenv("CLICKHOUSE_READ_ONLY", "true").lower() == "true" + return os.getenv("CLICKHOUSE_ALLOW_WRITE_ACCESS", "false").lower() == "true" @property def allow_drop(self) -> bool: """Get whether DROP operations (DROP TABLE, DROP DATABASE) are allowed. This setting provides an additional safety layer when write access is enabled. - Even with CLICKHOUSE_READ_ONLY=false, DROP operations require this flag. + Even with CLICKHOUSE_ALLOW_WRITE_ACCESS=true, DROP operations require this flag. Default: False """ diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index 04567d4..148b937 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -179,9 +179,11 @@ def _validate_query_for_drop(query: str) -> None: """ config = get_config() - if config.read_only: + # If writes are not enabled, skip this check (readonly mode will catch it anyway) + if not config.allow_write_access: return + # If DROP is explicitly allowed, no validation needed if config.allow_drop: return @@ -214,7 +216,7 @@ def execute_query(query: str): def run_query(query: str): """Execute a SQL query against ClickHouse. - Queries run in read-only mode by default (readonly=1). Set CLICKHOUSE_READ_ONLY=false + Queries run in read-only mode by default. Set CLICKHOUSE_ALLOW_WRITE_ACCESS=true to allow DDL and DML statements when your ClickHouse server permits them. """ logger.info(f"Executing query: {query}") @@ -246,16 +248,16 @@ def run_query(query: str): def _get_query_tool_description() -> str: config = get_config() - if config.read_only: + if not config.allow_write_access: return ( "Execute SQL queries in ClickHouse. Queries run in read-only mode by default. " - "Set CLICKHOUSE_READ_ONLY=false to allow DDL and DML when appropriate." + "Set CLICKHOUSE_ALLOW_WRITE_ACCESS=true to allow DDL and DML when appropriate." ) drop_status = "DROP operations are allowed" if config.allow_drop else "DROP operations are blocked (set CLICKHOUSE_ALLOW_DROP=true to enable)" return ( f"Execute SQL queries in ClickHouse with writes enabled. " - f"CLICKHOUSE_READ_ONLY=false so DDL and DML statements are permitted. " + f"CLICKHOUSE_ALLOW_WRITE_ACCESS=true so DDL and DML statements are permitted. " f"{drop_status}." ) @@ -296,12 +298,12 @@ def get_readonly_setting(client) -> Optional[str]: """Determine the readonly setting value for queries. This implements the following logic: - 1. If CLICKHOUSE_READ_ONLY=false (writes enabled): + 1. If CLICKHOUSE_ALLOW_WRITE_ACCESS=true (writes enabled): - Allow writes if server permits (server readonly=None or "0") - Fall back to server's readonly setting if server enforces it - Log a warning when falling back - 2. If CLICKHOUSE_READ_ONLY=true (default, read-only mode): + 2. If CLICKHOUSE_ALLOW_WRITE_ACCESS=false (default, read-only mode): - Enforce readonly=1 if server allows writes - Respect server's readonly setting if server enforces stricter mode @@ -315,21 +317,21 @@ def get_readonly_setting(client) -> Optional[str]: server_settings = getattr(client, "server_settings", {}) or {} server_readonly = _normalize_readonly_value(server_settings.get("readonly")) - # Case 1: User wants write access (CLICKHOUSE_READ_ONLY=false) - if not config.read_only: + # Case 1: User wants write access (CLICKHOUSE_ALLOW_WRITE_ACCESS=true) + if config.allow_write_access: if server_readonly in (None, "0"): - logger.info("Write mode enabled (CLICKHOUSE_READ_ONLY=false)") + logger.info("Write mode enabled (CLICKHOUSE_ALLOW_WRITE_ACCESS=true)") return "0" # If server forbids writes, respect server configuration logger.warning( - "CLICKHOUSE_READ_ONLY=false but server enforces readonly=%s; " + "CLICKHOUSE_ALLOW_WRITE_ACCESS=true but server enforces readonly=%s; " "write operations will fail", server_readonly, ) return server_readonly - # Case 2: User wants read-only mode (CLICKHOUSE_READ_ONLY=true, default) + # Case 2: User wants read-only mode (CLICKHOUSE_ALLOW_WRITE_ACCESS=false, default) if server_readonly in (None, "0"): return "1" # Enforce read-only since server allows writes diff --git a/tests/test_tool.py b/tests/test_tool.py index c8c0cc5..ce6b1ed 100644 --- a/tests/test_tool.py +++ b/tests/test_tool.py @@ -101,11 +101,11 @@ def test_table_and_column_comments(self): self.assertEqual(columns["name"]["comment"], "User name field") -@patch.dict(os.environ, {"CLICKHOUSE_READ_ONLY": "false", "CLICKHOUSE_ALLOW_DROP": "true"}) +@patch.dict(os.environ, {"CLICKHOUSE_ALLOW_WRITE_ACCESS": "true", "CLICKHOUSE_ALLOW_DROP": "true"}) class TestClickhouseWriteMode(unittest.TestCase): - """Tests for write mode functionality (CLICKHOUSE_READ_ONLY=false). + """Tests for write mode functionality (CLICKHOUSE_ALLOW_WRITE_ACCESS=true). - Note: These tests use @patch.dict to temporarily set CLICKHOUSE_READ_ONLY=false + Note: These tests use @patch.dict to temporarily set CLICKHOUSE_ALLOW_WRITE_ACCESS=true and CLICKHOUSE_ALLOW_DROP=true without affecting other tests. This allows testing write operations in isolation. """ @@ -194,7 +194,7 @@ def test_alter_table_query(self): self.client.command(f"DROP TABLE {self.test_db}.alter_test") -@patch.dict(os.environ, {"CLICKHOUSE_READ_ONLY": "false", "CLICKHOUSE_ALLOW_DROP": "true"}) +@patch.dict(os.environ, {"CLICKHOUSE_ALLOW_WRITE_ACCESS": "true", "CLICKHOUSE_ALLOW_DROP": "true"}) class TestClickhouseDropProtection(unittest.TestCase): """Tests for DROP operation protection. @@ -225,7 +225,7 @@ def tearDownClass(cls): """Clean up the environment after tests.""" cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") - @patch.dict(os.environ, {"CLICKHOUSE_READ_ONLY": "false", "CLICKHOUSE_ALLOW_DROP": "false"}) + @patch.dict(os.environ, {"CLICKHOUSE_ALLOW_WRITE_ACCESS": "true", "CLICKHOUSE_ALLOW_DROP": "false"}) def test_drop_table_blocked_when_flag_not_set(self): """Test that DROP TABLE is blocked when CLICKHOUSE_ALLOW_DROP=false.""" drop_query = f"DROP TABLE {self.test_db}.{self.test_table}" @@ -238,7 +238,7 @@ def test_drop_table_blocked_when_flag_not_set(self): self.assertIn("DROP operations are not allowed", error_msg) self.assertIn("CLICKHOUSE_ALLOW_DROP=true", error_msg) - @patch.dict(os.environ, {"CLICKHOUSE_READ_ONLY": "false", "CLICKHOUSE_ALLOW_DROP": "false"}) + @patch.dict(os.environ, {"CLICKHOUSE_ALLOW_WRITE_ACCESS": "true", "CLICKHOUSE_ALLOW_DROP": "false"}) def test_drop_database_blocked_when_flag_not_set(self): """Test that DROP DATABASE is blocked when CLICKHOUSE_ALLOW_DROP=false.""" temp_db = "test_temp_drop_db" @@ -272,7 +272,7 @@ def test_drop_allowed_when_flag_set(self): result = run_query(drop_query) self.assertIsInstance(result, dict) - @patch.dict(os.environ, {"CLICKHOUSE_READ_ONLY": "false", "CLICKHOUSE_ALLOW_DROP": "false"}) + @patch.dict(os.environ, {"CLICKHOUSE_ALLOW_WRITE_ACCESS": "true", "CLICKHOUSE_ALLOW_DROP": "false"}) def test_insert_allowed_without_drop_flag(self): """Test that INSERT works even when CLICKHOUSE_ALLOW_DROP=false.""" insert_query = f""" @@ -286,7 +286,7 @@ def test_insert_allowed_without_drop_flag(self): result = run_query(select_query) self.assertGreaterEqual(len(result["rows"]), 1) - @patch.dict(os.environ, {"CLICKHOUSE_READ_ONLY": "false", "CLICKHOUSE_ALLOW_DROP": "false"}) + @patch.dict(os.environ, {"CLICKHOUSE_ALLOW_WRITE_ACCESS": "true", "CLICKHOUSE_ALLOW_DROP": "false"}) def test_create_allowed_without_drop_flag(self): """Test that CREATE TABLE works even when CLICKHOUSE_ALLOW_DROP=false.""" create_query = f""" @@ -306,16 +306,16 @@ def test_create_allowed_without_drop_flag(self): class TestClickhouseReadOnlyMode(unittest.TestCase): - """Tests for read-only mode functionality (CLICKHOUSE_READ_ONLY=true, default). + """Tests for read-only mode functionality (CLICKHOUSE_ALLOW_WRITE_ACCESS=false, default). These tests verify that write operations are properly blocked when - CLICKHOUSE_READ_ONLY is true (the default setting). + CLICKHOUSE_ALLOW_WRITE_ACCESS is false (the default setting). """ @classmethod def setUpClass(cls): """Set up the environment before tests.""" - cls.env_patcher = patch.dict(os.environ, {"CLICKHOUSE_READ_ONLY": "true"}) + cls.env_patcher = patch.dict(os.environ, {"CLICKHOUSE_ALLOW_WRITE_ACCESS": "false"}) cls.env_patcher.start() cls.client = create_clickhouse_client() @@ -339,7 +339,7 @@ def tearDownClass(cls): cls.env_patcher.stop() def test_insert_blocked_in_readonly_mode(self): - """Test that INSERT queries fail when CLICKHOUSE_READ_ONLY=true.""" + """Test that INSERT queries fail when CLICKHOUSE_ALLOW_WRITE_ACCESS=false.""" insert_query = f""" INSERT INTO {self.test_db}.{self.test_table} (id, value) VALUES (1, 'should_fail') @@ -356,7 +356,7 @@ def test_insert_blocked_in_readonly_mode(self): ) def test_create_table_blocked_in_readonly_mode(self): - """Test that CREATE TABLE queries fail when CLICKHOUSE_READ_ONLY=true.""" + """Test that CREATE TABLE queries fail when CLICKHOUSE_ALLOW_WRITE_ACCESS=false.""" create_query = f""" CREATE TABLE {self.test_db}.should_not_exist ( id UInt32 @@ -375,7 +375,7 @@ def test_create_table_blocked_in_readonly_mode(self): ) def test_alter_table_blocked_in_readonly_mode(self): - """Test that ALTER TABLE queries fail when CLICKHOUSE_READ_ONLY=true.""" + """Test that ALTER TABLE queries fail when CLICKHOUSE_ALLOW_WRITE_ACCESS=false.""" alter_query = f""" ALTER TABLE {self.test_db}.{self.test_table} ADD COLUMN new_column String From ce1a4c7a75b84e064c86419f93a0b1ccc228e2d1 Mon Sep 17 00:00:00 2001 From: Joe S Date: Thu, 23 Oct 2025 13:36:45 -0700 Subject: [PATCH 3/4] address pr suggestions --- mcp_clickhouse/mcp_server.py | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index 148b937..7920c4b 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -188,10 +188,8 @@ def _validate_query_for_drop(query: str) -> None: return # Simple pattern matching for DROP operations - query_upper = query.upper() - drop_pattern = r'\bDROP\s+(TABLE|DATABASE|VIEW|DICTIONARY)\b' - if re.search(drop_pattern, query_upper): + if re.search(drop_pattern, query, re.IGNORECASE): raise ToolError( "DROP operations are not allowed. " "Set CLICKHOUSE_ALLOW_DROP=true to enable DROP TABLE/DATABASE operations. " @@ -343,6 +341,16 @@ def _normalize_readonly_value(value: Any) -> Optional[str]: The clickhouse_connect library represents settings as objects with a .value attribute. This function extracts the actual value for our logic. + + Args: + value: The readonly setting value from ClickHouse server. Can be: + - None (server has no readonly restriction) + - A clickhouse_connect setting object with a .value attribute + - An int (0, 1, 2) + - A str ("0", "1", "2") + + Returns: + Optional[str]: Normalized readonly value as string ("0", "1", "2") or None """ if value is None: return None From 0afed23f2b1d2c43a9ade5f557e20bbd959bdf18 Mon Sep 17 00:00:00 2001 From: Joe S Date: Thu, 23 Oct 2025 13:47:05 -0700 Subject: [PATCH 4/4] change tool desc to be static, not run on import --- mcp_clickhouse/mcp_server.py | 25 ++++++++----------------- 1 file changed, 8 insertions(+), 17 deletions(-) diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index 7920c4b..d1d3bd7 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -244,22 +244,6 @@ def run_query(query: str): raise RuntimeError(f"Unexpected error during query execution: {str(e)}") -def _get_query_tool_description() -> str: - config = get_config() - if not config.allow_write_access: - return ( - "Execute SQL queries in ClickHouse. Queries run in read-only mode by default. " - "Set CLICKHOUSE_ALLOW_WRITE_ACCESS=true to allow DDL and DML when appropriate." - ) - - drop_status = "DROP operations are allowed" if config.allow_drop else "DROP operations are blocked (set CLICKHOUSE_ALLOW_DROP=true to enable)" - return ( - f"Execute SQL queries in ClickHouse with writes enabled. " - f"CLICKHOUSE_ALLOW_WRITE_ACCESS=true so DDL and DML statements are permitted. " - f"{drop_status}." - ) - - def create_clickhouse_client(): client_config = get_config().get_client_config() logger.info( @@ -449,7 +433,14 @@ def _init_chdb_client(): if os.getenv("CLICKHOUSE_ENABLED", "true").lower() == "true": mcp.add_tool(Tool.from_function(list_databases)) mcp.add_tool(Tool.from_function(list_tables)) - mcp.add_tool(Tool.from_function(run_query, description=_get_query_tool_description())) + mcp.add_tool(Tool.from_function( + run_query, + description=( + "Execute SQL queries in ClickHouse. Queries run in read-only mode by default. " + "Set CLICKHOUSE_ALLOW_WRITE_ACCESS=true to allow DDL and DML operations. " + "Set CLICKHOUSE_ALLOW_DROP=true to additionally allow DROP operations." + ) + )) logger.info("ClickHouse tools registered")