diff --git a/README.md b/README.md index 3db4b9e..5e33f49 100644 --- a/README.md +++ b/README.md @@ -10,10 +10,10 @@ An MCP server for ClickHouse. ### ClickHouse Tools -* `run_select_query` +* `run_query` * Execute SQL queries on your ClickHouse cluster. - * Input: `sql` (string): The SQL query to execute. - * All ClickHouse queries are run with `readonly = 1` to ensure they are safe. + * Input: `query` (string): The SQL query to execute. + * Queries run in read-only mode by default (`CLICKHOUSE_ALLOW_WRITE_ACCESS=false`), but writes can be enabled explicitly if needed. * `list_databases` * List all databases on your ClickHouse cluster. @@ -26,7 +26,7 @@ An MCP server for ClickHouse. * `run_chdb_select_query` * Execute SQL queries using [chDB](https://github.com/chdb-io/chdb)'s embedded ClickHouse engine. - * Input: `sql` (string): The SQL query to execute. + * Input: `query` (string): The SQL query to execute. * Query data directly from various sources (files, URLs, databases) without ETL processes. ### Health Check Endpoint @@ -172,6 +172,26 @@ You can also enable both ClickHouse and chDB simultaneously: 4. Restart Claude Desktop to apply the changes. +### Optional Write Access + +By default, this MCP enforces read-only queries so that accidental mutations cannot happen during exploration. To allow DDL or INSERT/UPDATE statements, set the `CLICKHOUSE_ALLOW_WRITE_ACCESS` environment variable to `true`. The server keeps enforcing read-only mode if the ClickHouse instance itself disallows writes. + +### DROP Operation Protection + +Even when write access is enabled (`CLICKHOUSE_ALLOW_WRITE_ACCESS=true`), DROP operations (DROP TABLE, DROP DATABASE, DROP VIEW, DROP DICTIONARY) require an additional opt-in flag for safety. This prevents accidental data deletion during AI exploration. + +To enable DROP operations, set both flags: +```json +"env": { + "CLICKHOUSE_ALLOW_WRITE_ACCESS": "true", + "CLICKHOUSE_ALLOW_DROP": "true" +} +``` + +This two-tier approach ensures that accidental drops are very difficult: +- **Write operations** (INSERT, UPDATE, CREATE) require `CLICKHOUSE_ALLOW_WRITE_ACCESS=true` +- **Destructive operations** (DROP) additionally require `CLICKHOUSE_ALLOW_DROP=true` + ### Running Without uv (Using System Python) If you prefer to use the system Python installation instead of uv, you can install the package from PyPI and run it directly: @@ -321,6 +341,15 @@ The following environment variables are used to configure the ClickHouse and chD * `CLICKHOUSE_ENABLED`: Enable/disable ClickHouse functionality * Default: `"true"` * Set to `"false"` to disable ClickHouse tools when using chDB only +* `CLICKHOUSE_ALLOW_WRITE_ACCESS`: Allow write operations (DDL and DML) + * Default: `"false"` + * Set to `"true"` to allow DDL (CREATE, ALTER, DROP) and DML (INSERT, UPDATE, DELETE) operations + * When disabled (default), queries run with `readonly=1` setting to prevent data modifications +* `CLICKHOUSE_ALLOW_DROP`: Allow DROP operations (DROP TABLE, DROP DATABASE, DROP VIEW, DROP DICTIONARY) + * Default: `"false"` + * Only takes effect when `CLICKHOUSE_ALLOW_WRITE_ACCESS=true` is also set + * Set to `"true"` to explicitly allow destructive DROP operations + * This is a safety feature to prevent accidental data deletion during AI exploration #### chDB Variables diff --git a/mcp_clickhouse/__init__.py b/mcp_clickhouse/__init__.py index c442d9c..282af0d 100644 --- a/mcp_clickhouse/__init__.py +++ b/mcp_clickhouse/__init__.py @@ -4,7 +4,7 @@ create_clickhouse_client, list_databases, list_tables, - run_select_query, + run_query, create_chdb_client, run_chdb_select_query, chdb_initial_prompt, @@ -21,7 +21,7 @@ __all__ = [ "list_databases", "list_tables", - "run_select_query", + "run_query", "create_clickhouse_client", "create_chdb_client", "run_chdb_select_query", diff --git a/mcp_clickhouse/mcp_env.py b/mcp_clickhouse/mcp_env.py index 64e0e8a..976961e 100644 --- a/mcp_clickhouse/mcp_env.py +++ b/mcp_clickhouse/mcp_env.py @@ -44,6 +44,8 @@ class ClickHouseConfig: CLICKHOUSE_DATABASE: Default database to use (default: None) CLICKHOUSE_PROXY_PATH: Path to be added to the host URL. For instance, for servers behind an HTTP proxy (default: None) CLICKHOUSE_ENABLED: Enable ClickHouse server (default: true) + CLICKHOUSE_ALLOW_WRITE_ACCESS: Allow write operations (DDL and DML) (default: false) + CLICKHOUSE_ALLOW_DROP: Allow DROP operations when writes are also enabled (default: false) """ def __init__(self): @@ -126,6 +128,25 @@ def send_receive_timeout(self) -> int: def proxy_path(self) -> str: return os.getenv("CLICKHOUSE_PROXY_PATH") + @property + def allow_write_access(self) -> bool: + """Get whether write operations (DDL and DML) are allowed. + + Default: False + """ + return os.getenv("CLICKHOUSE_ALLOW_WRITE_ACCESS", "false").lower() == "true" + + @property + def allow_drop(self) -> bool: + """Get whether DROP operations (DROP TABLE, DROP DATABASE) are allowed. + + This setting provides an additional safety layer when write access is enabled. + Even with CLICKHOUSE_ALLOW_WRITE_ACCESS=true, DROP operations require this flag. + + Default: False + """ + return os.getenv("CLICKHOUSE_ALLOW_DROP", "false").lower() == "true" + def get_client_config(self) -> dict: """Get the configuration dictionary for clickhouse_connect client. diff --git a/mcp_clickhouse/mcp_server.py b/mcp_clickhouse/mcp_server.py index 01639b2..d1d3bd7 100644 --- a/mcp_clickhouse/mcp_server.py +++ b/mcp_clickhouse/mcp_server.py @@ -4,6 +4,7 @@ import concurrent.futures import atexit import os +import re import clickhouse_connect import chdb.session as chs @@ -167,11 +168,42 @@ def list_tables(database: str, like: Optional[str] = None, not_like: Optional[st return [asdict(table) for table in tables] +def _validate_query_for_drop(query: str) -> None: + """Validate that DROP operations are allowed. + + Args: + query: The SQL query to validate + + Raises: + ToolError: If the query contains DROP operations but CLICKHOUSE_ALLOW_DROP is not set + """ + config = get_config() + + # If writes are not enabled, skip this check (readonly mode will catch it anyway) + if not config.allow_write_access: + return + + # If DROP is explicitly allowed, no validation needed + if config.allow_drop: + return + + # Simple pattern matching for DROP operations + drop_pattern = r'\bDROP\s+(TABLE|DATABASE|VIEW|DICTIONARY)\b' + if re.search(drop_pattern, query, re.IGNORECASE): + raise ToolError( + "DROP operations are not allowed. " + "Set CLICKHOUSE_ALLOW_DROP=true to enable DROP TABLE/DATABASE operations. " + "This is a safety feature to prevent accidental data deletion." + ) + + def execute_query(query: str): client = create_clickhouse_client() try: - read_only = get_readonly_setting(client) - res = client.query(query, settings={"readonly": read_only}) + _validate_query_for_drop(query) + + query_settings = build_query_settings(client) + res = client.query(query, settings=query_settings) logger.info(f"Query returned {len(res.result_rows)} rows") return {"columns": res.column_names, "rows": res.result_rows} except Exception as err: @@ -179,9 +211,13 @@ def execute_query(query: str): raise ToolError(f"Query execution failed: {str(err)}") -def run_select_query(query: str): - """Run a SELECT query in a ClickHouse database""" - logger.info(f"Executing SELECT query: {query}") +def run_query(query: str): + """Execute a SQL query against ClickHouse. + + Queries run in read-only mode by default. Set CLICKHOUSE_ALLOW_WRITE_ACCESS=true + to allow DDL and DML statements when your ClickHouse server permits them. + """ + logger.info(f"Executing query: {query}") try: future = QUERY_EXECUTOR.submit(execute_query, query) try: @@ -204,7 +240,7 @@ def run_select_query(query: str): except ToolError: raise except Exception as e: - logger.error(f"Unexpected error in run_select_query: {str(e)}") + logger.error("Unexpected error in run_query: %s", str(e)) raise RuntimeError(f"Unexpected error during query execution: {str(e)}") @@ -229,34 +265,85 @@ def create_clickhouse_client(): raise -def get_readonly_setting(client) -> str: - """Get the appropriate readonly setting value to use for queries. +def build_query_settings(client) -> dict[str, str]: + """Build query settings dict for ClickHouse queries. + + Always returns a dict (possibly empty) to ensure consistent behavior. + """ + readonly_setting = get_readonly_setting(client) + if readonly_setting is not None: + return {"readonly": readonly_setting} + return {} + - This function handles potential conflicts between server and client readonly settings: - - readonly=0: No read-only restrictions - - readonly=1: Only read queries allowed, settings cannot be changed - - readonly=2: Only read queries allowed, settings can be changed (except readonly itself) +def get_readonly_setting(client) -> Optional[str]: + """Determine the readonly setting value for queries. - If server has readonly=2 and client tries to set readonly=1, it would cause: - "Setting readonly is unknown or readonly" error + This implements the following logic: + 1. If CLICKHOUSE_ALLOW_WRITE_ACCESS=true (writes enabled): + - Allow writes if server permits (server readonly=None or "0") + - Fall back to server's readonly setting if server enforces it + - Log a warning when falling back - This function preserves the server's readonly setting unless it's 0, in which case - we enforce readonly=1 to ensure queries are read-only. + 2. If CLICKHOUSE_ALLOW_WRITE_ACCESS=false (default, read-only mode): + - Enforce readonly=1 if server allows writes + - Respect server's readonly setting if server enforces stricter mode + + Returns: + "0" = writes allowed + "1" = read-only mode (allows SET of non-privileged settings) + "2" = strict read-only (server enforced; disallows SET) + None = use server default (shouldn't happen in practice) + """ + config = get_config() + server_settings = getattr(client, "server_settings", {}) or {} + server_readonly = _normalize_readonly_value(server_settings.get("readonly")) + + # Case 1: User wants write access (CLICKHOUSE_ALLOW_WRITE_ACCESS=true) + if config.allow_write_access: + if server_readonly in (None, "0"): + logger.info("Write mode enabled (CLICKHOUSE_ALLOW_WRITE_ACCESS=true)") + return "0" + + # If server forbids writes, respect server configuration + logger.warning( + "CLICKHOUSE_ALLOW_WRITE_ACCESS=true but server enforces readonly=%s; " + "write operations will fail", + server_readonly, + ) + return server_readonly + + # Case 2: User wants read-only mode (CLICKHOUSE_ALLOW_WRITE_ACCESS=false, default) + if server_readonly in (None, "0"): + return "1" # Enforce read-only since server allows writes + + return server_readonly # Server already enforces readonly, respect it + + +def _normalize_readonly_value(value: Any) -> Optional[str]: + """Normalize ClickHouse readonly setting to a simple string. + + The clickhouse_connect library represents settings as objects with a .value attribute. + This function extracts the actual value for our logic. Args: - client: ClickHouse client connection + value: The readonly setting value from ClickHouse server. Can be: + - None (server has no readonly restriction) + - A clickhouse_connect setting object with a .value attribute + - An int (0, 1, 2) + - A str ("0", "1", "2") Returns: - String value of readonly setting to use + Optional[str]: Normalized readonly value as string ("0", "1", "2") or None """ - read_only = client.server_settings.get("readonly") - if read_only: - if read_only == "0": - return "1" # Force read-only mode if server has it disabled - else: - return read_only.value # Respect server's readonly setting (likely 2) - else: - return "1" # Default to basic read-only mode if setting isn't present + if value is None: + return None + + # Extract value from clickhouse_connect setting object + if hasattr(value, "value"): + value = value.value + + return str(value) def create_chdb_client(): @@ -346,7 +433,14 @@ def _init_chdb_client(): if os.getenv("CLICKHOUSE_ENABLED", "true").lower() == "true": mcp.add_tool(Tool.from_function(list_databases)) mcp.add_tool(Tool.from_function(list_tables)) - mcp.add_tool(Tool.from_function(run_select_query)) + mcp.add_tool(Tool.from_function( + run_query, + description=( + "Execute SQL queries in ClickHouse. Queries run in read-only mode by default. " + "Set CLICKHOUSE_ALLOW_WRITE_ACCESS=true to allow DDL and DML operations. " + "Set CLICKHOUSE_ALLOW_DROP=true to additionally allow DROP operations." + ) + )) logger.info("ClickHouse tools registered") diff --git a/tests/test_mcp_server.py b/tests/test_mcp_server.py index 28c4420..59bd747 100644 --- a/tests/test_mcp_server.py +++ b/tests/test_mcp_server.py @@ -189,7 +189,7 @@ async def test_run_select_query_success(mcp_server, setup_test_database): async with Client(mcp_server) as client: query = f"SELECT id, name, age FROM {test_db}.{test_table} ORDER BY id" - result = await client.call_tool("run_select_query", {"query": query}) + result = await client.call_tool("run_query", {"query": query}) query_result = json.loads(result.content[0].text) @@ -215,7 +215,7 @@ async def test_run_select_query_with_aggregation(mcp_server, setup_test_database async with Client(mcp_server) as client: query = f"SELECT COUNT(*) as count, AVG(age) as avg_age FROM {test_db}.{test_table}" - result = await client.call_tool("run_select_query", {"query": query}) + result = await client.call_tool("run_query", {"query": query}) query_result = json.loads(result.content[0].text) @@ -243,7 +243,7 @@ async def test_run_select_query_with_join(mcp_server, setup_test_database): COUNT(DISTINCT event_type) as event_types_count FROM {test_db}.{test_table2} """ - result = await client.call_tool("run_select_query", {"query": query}) + result = await client.call_tool("run_query", {"query": query}) query_result = json.loads(result.content[0].text) assert query_result["rows"][0][0] == 3 # login, logout, purchase @@ -260,7 +260,7 @@ async def test_run_select_query_error(mcp_server, setup_test_database): # Should raise ToolError with pytest.raises(ToolError) as exc_info: - await client.call_tool("run_select_query", {"query": query}) + await client.call_tool("run_query", {"query": query}) assert "Query execution failed" in str(exc_info.value) @@ -274,7 +274,7 @@ async def test_run_select_query_syntax_error(mcp_server): # Should raise ToolError with pytest.raises(ToolError) as exc_info: - await client.call_tool("run_select_query", {"query": query}) + await client.call_tool("run_query", {"query": query}) assert "Query execution failed" in str(exc_info.value) @@ -352,7 +352,7 @@ async def test_concurrent_queries(mcp_server, setup_test_database): # Execute all queries concurrently results = await asyncio.gather( - *[client.call_tool("run_select_query", {"query": query}) for query in queries] + *[client.call_tool("run_query", {"query": query}) for query in queries] ) # Verify all queries succeeded diff --git a/tests/test_tool.py b/tests/test_tool.py index 50878c4..ce6b1ed 100644 --- a/tests/test_tool.py +++ b/tests/test_tool.py @@ -1,10 +1,12 @@ -import unittest import json +import os +import unittest +from unittest.mock import patch from dotenv import load_dotenv from fastmcp.exceptions import ToolError -from mcp_clickhouse import create_clickhouse_client, list_databases, list_tables, run_select_query +from mcp_clickhouse import create_clickhouse_client, list_databases, list_tables, run_query load_dotenv() @@ -62,22 +64,22 @@ def test_list_tables_with_like(self): self.assertEqual(len(result), 1) self.assertEqual(result[0]["name"], self.test_table) - def test_run_select_query_success(self): + def test_run_query_success(self): """Test running a SELECT query successfully.""" query = f"SELECT * FROM {self.test_db}.{self.test_table}" - result = run_select_query(query) + result = run_query(query) self.assertIsInstance(result, dict) self.assertEqual(len(result["rows"]), 2) self.assertEqual(result["rows"][0][0], 1) self.assertEqual(result["rows"][0][1], "Alice") - def test_run_select_query_failure(self): + def test_run_query_failure(self): """Test running a SELECT query with an error.""" query = f"SELECT * FROM {self.test_db}.non_existent_table" # Should raise ToolError with self.assertRaises(ToolError) as context: - run_select_query(query) + run_query(query) self.assertIn("Query execution failed", str(context.exception)) @@ -99,5 +101,305 @@ def test_table_and_column_comments(self): self.assertEqual(columns["name"]["comment"], "User name field") +@patch.dict(os.environ, {"CLICKHOUSE_ALLOW_WRITE_ACCESS": "true", "CLICKHOUSE_ALLOW_DROP": "true"}) +class TestClickhouseWriteMode(unittest.TestCase): + """Tests for write mode functionality (CLICKHOUSE_ALLOW_WRITE_ACCESS=true). + + Note: These tests use @patch.dict to temporarily set CLICKHOUSE_ALLOW_WRITE_ACCESS=true + and CLICKHOUSE_ALLOW_DROP=true without affecting other tests. This allows testing + write operations in isolation. + """ + + @classmethod + def setUpClass(cls): + """Set up the environment before tests.""" + cls.client = create_clickhouse_client() + cls.test_db = "test_write_mode_db" + cls.test_table = "write_test_table" + + cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") + cls.client.command(f"CREATE DATABASE {cls.test_db}") + + @classmethod + def tearDownClass(cls): + """Clean up the environment after tests.""" + cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") + + def test_insert_query(self): + """Test that INSERT queries work when writes are enabled.""" + create_query = f""" + CREATE TABLE {self.test_db}.{self.test_table} ( + id UInt32, + value String + ) ENGINE = MergeTree() + ORDER BY id + """ + result = run_query(create_query) + self.assertIsInstance(result, dict) + + insert_query = f""" + INSERT INTO {self.test_db}.{self.test_table} (id, value) + VALUES (1, 'test_value') + """ + result = run_query(insert_query) + self.assertIsInstance(result, dict) + + select_query = f"SELECT * FROM {self.test_db}.{self.test_table}" + result = run_query(select_query) + self.assertEqual(len(result["rows"]), 1) + self.assertEqual(result["rows"][0][0], 1) + self.assertEqual(result["rows"][0][1], "test_value") + + self.client.command(f"DROP TABLE {self.test_db}.{self.test_table}") + + def test_create_table_query(self): + """Test that CREATE TABLE queries work when writes are enabled.""" + create_query = f""" + CREATE TABLE {self.test_db}.ddl_test ( + id UInt32, + name String + ) ENGINE = MergeTree() + ORDER BY id + """ + result = run_query(create_query) + self.assertIsInstance(result, dict) + + tables = list_tables(self.test_db) + table_names = [t["name"] for t in tables] + self.assertIn("ddl_test", table_names) + + self.client.command(f"DROP TABLE {self.test_db}.ddl_test") + + def test_alter_table_query(self): + """Test that ALTER TABLE queries work when writes are enabled.""" + self.client.command(f""" + CREATE TABLE {self.test_db}.alter_test ( + id UInt32 + ) ENGINE = MergeTree() + ORDER BY id + """) + + alter_query = f""" + ALTER TABLE {self.test_db}.alter_test + ADD COLUMN name String + """ + result = run_query(alter_query) + self.assertIsInstance(result, dict) + + tables = list_tables(self.test_db, like="alter_test") + self.assertEqual(len(tables), 1) + column_names = [col["name"] for col in tables[0]["columns"]] + self.assertIn("name", column_names) + + self.client.command(f"DROP TABLE {self.test_db}.alter_test") + + +@patch.dict(os.environ, {"CLICKHOUSE_ALLOW_WRITE_ACCESS": "true", "CLICKHOUSE_ALLOW_DROP": "true"}) +class TestClickhouseDropProtection(unittest.TestCase): + """Tests for DROP operation protection. + + These tests verify that DROP operations (DROP TABLE, DROP DATABASE) are + properly controlled by the CLICKHOUSE_ALLOW_DROP flag when writes are enabled. + """ + + @classmethod + def setUpClass(cls): + """Set up the environment before tests.""" + cls.client = create_clickhouse_client() + cls.test_db = "test_drop_protection_db" + cls.test_table = "drop_test_table" + + # Use direct client commands for setup (bypassing run_query) + cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") + cls.client.command(f"CREATE DATABASE {cls.test_db}") + cls.client.command(f""" + CREATE TABLE {cls.test_db}.{cls.test_table} ( + id UInt32, + value String + ) ENGINE = MergeTree() + ORDER BY id + """) + + @classmethod + def tearDownClass(cls): + """Clean up the environment after tests.""" + cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") + + @patch.dict(os.environ, {"CLICKHOUSE_ALLOW_WRITE_ACCESS": "true", "CLICKHOUSE_ALLOW_DROP": "false"}) + def test_drop_table_blocked_when_flag_not_set(self): + """Test that DROP TABLE is blocked when CLICKHOUSE_ALLOW_DROP=false.""" + drop_query = f"DROP TABLE {self.test_db}.{self.test_table}" + + # Should raise ToolError due to DROP protection + with self.assertRaises(ToolError) as context: + run_query(drop_query) + + error_msg = str(context.exception) + self.assertIn("DROP operations are not allowed", error_msg) + self.assertIn("CLICKHOUSE_ALLOW_DROP=true", error_msg) + + @patch.dict(os.environ, {"CLICKHOUSE_ALLOW_WRITE_ACCESS": "true", "CLICKHOUSE_ALLOW_DROP": "false"}) + def test_drop_database_blocked_when_flag_not_set(self): + """Test that DROP DATABASE is blocked when CLICKHOUSE_ALLOW_DROP=false.""" + temp_db = "test_temp_drop_db" + self.client.command(f"CREATE DATABASE IF NOT EXISTS {temp_db}") + + drop_query = f"DROP DATABASE {temp_db}" + + # Should raise ToolError due to DROP protection + with self.assertRaises(ToolError) as context: + run_query(drop_query) + + error_msg = str(context.exception) + self.assertIn("DROP operations are not allowed", error_msg) + self.assertIn("CLICKHOUSE_ALLOW_DROP=true", error_msg) + + self.client.command(f"DROP DATABASE IF EXISTS {temp_db}") + + def test_drop_allowed_when_flag_set(self): + """Test that DROP works when CLICKHOUSE_ALLOW_DROP=true.""" + # This test runs with ALLOW_DROP=true from the class decorator + temp_table = "temp_drop_table" + self.client.command(f""" + CREATE TABLE {self.test_db}.{temp_table} ( + id UInt32 + ) ENGINE = MergeTree() + ORDER BY id + """) + + # Should succeed + drop_query = f"DROP TABLE {self.test_db}.{temp_table}" + result = run_query(drop_query) + self.assertIsInstance(result, dict) + + @patch.dict(os.environ, {"CLICKHOUSE_ALLOW_WRITE_ACCESS": "true", "CLICKHOUSE_ALLOW_DROP": "false"}) + def test_insert_allowed_without_drop_flag(self): + """Test that INSERT works even when CLICKHOUSE_ALLOW_DROP=false.""" + insert_query = f""" + INSERT INTO {self.test_db}.{self.test_table} (id, value) + VALUES (1, 'test_value') + """ + result = run_query(insert_query) + self.assertIsInstance(result, dict) + + select_query = f"SELECT * FROM {self.test_db}.{self.test_table}" + result = run_query(select_query) + self.assertGreaterEqual(len(result["rows"]), 1) + + @patch.dict(os.environ, {"CLICKHOUSE_ALLOW_WRITE_ACCESS": "true", "CLICKHOUSE_ALLOW_DROP": "false"}) + def test_create_allowed_without_drop_flag(self): + """Test that CREATE TABLE works even when CLICKHOUSE_ALLOW_DROP=false.""" + create_query = f""" + CREATE TABLE {self.test_db}.create_test ( + id UInt32 + ) ENGINE = MergeTree() + ORDER BY id + """ + result = run_query(create_query) + self.assertIsInstance(result, dict) + + tables = list_tables(self.test_db) + table_names = [t["name"] for t in tables] + self.assertIn("create_test", table_names) + + self.client.command(f"DROP TABLE {self.test_db}.create_test") + + +class TestClickhouseReadOnlyMode(unittest.TestCase): + """Tests for read-only mode functionality (CLICKHOUSE_ALLOW_WRITE_ACCESS=false, default). + + These tests verify that write operations are properly blocked when + CLICKHOUSE_ALLOW_WRITE_ACCESS is false (the default setting). + """ + + @classmethod + def setUpClass(cls): + """Set up the environment before tests.""" + cls.env_patcher = patch.dict(os.environ, {"CLICKHOUSE_ALLOW_WRITE_ACCESS": "false"}) + cls.env_patcher.start() + + cls.client = create_clickhouse_client() + cls.test_db = "test_readonly_db" + cls.test_table = "readonly_test_table" + + cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") + cls.client.command(f"CREATE DATABASE {cls.test_db}") + cls.client.command(f""" + CREATE TABLE {cls.test_db}.{cls.test_table} ( + id UInt32, + value String + ) ENGINE = MergeTree() + ORDER BY id + """) + + @classmethod + def tearDownClass(cls): + """Clean up the environment after tests.""" + cls.client.command(f"DROP DATABASE IF EXISTS {cls.test_db}") + cls.env_patcher.stop() + + def test_insert_blocked_in_readonly_mode(self): + """Test that INSERT queries fail when CLICKHOUSE_ALLOW_WRITE_ACCESS=false.""" + insert_query = f""" + INSERT INTO {self.test_db}.{self.test_table} (id, value) + VALUES (1, 'should_fail') + """ + + with self.assertRaises(ToolError) as context: + run_query(insert_query) + + error_msg = str(context.exception) + self.assertIn("Query execution failed", error_msg) + self.assertTrue( + "readonly" in error_msg.lower() or "cannot execute" in error_msg.lower(), + f"Expected readonly-related error, got: {error_msg}", + ) + + def test_create_table_blocked_in_readonly_mode(self): + """Test that CREATE TABLE queries fail when CLICKHOUSE_ALLOW_WRITE_ACCESS=false.""" + create_query = f""" + CREATE TABLE {self.test_db}.should_not_exist ( + id UInt32 + ) ENGINE = MergeTree() + ORDER BY id + """ + + with self.assertRaises(ToolError) as context: + run_query(create_query) + + error_msg = str(context.exception) + self.assertIn("Query execution failed", error_msg) + self.assertTrue( + "readonly" in error_msg.lower() or "cannot execute" in error_msg.lower(), + f"Expected readonly-related error, got: {error_msg}", + ) + + def test_alter_table_blocked_in_readonly_mode(self): + """Test that ALTER TABLE queries fail when CLICKHOUSE_ALLOW_WRITE_ACCESS=false.""" + alter_query = f""" + ALTER TABLE {self.test_db}.{self.test_table} + ADD COLUMN new_column String + """ + + with self.assertRaises(ToolError) as context: + run_query(alter_query) + + error_msg = str(context.exception) + self.assertIn("Query execution failed", error_msg) + self.assertTrue( + "readonly" in error_msg.lower() or "cannot execute" in error_msg.lower(), + f"Expected readonly-related error, got: {error_msg}", + ) + + def test_select_allowed_in_readonly_mode(self): + """Test that SELECT queries work normally in read-only mode.""" + select_query = f"SELECT * FROM {self.test_db}.{self.test_table}" + result = run_query(select_query) + + self.assertIsInstance(result, dict) + self.assertIn("columns", result) + self.assertIn("rows", result) + + if __name__ == "__main__": unittest.main()