diff --git a/Cargo.lock b/Cargo.lock index 824fa057..7ba8af2e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3238,6 +3238,7 @@ dependencies = [ "axum 0.8.6", "base64 0.22.1", "catalog-metastore", + "cfg-if", "executor", "flate2", "http 1.3.1", diff --git a/crates/embucket-lambda/src/config.rs b/crates/embucket-lambda/src/config.rs index d7c0467c..78dda9ed 100644 --- a/crates/embucket-lambda/src/config.rs +++ b/crates/embucket-lambda/src/config.rs @@ -18,6 +18,7 @@ pub struct EnvConfig { pub embucket_version: String, pub metastore_config: Option, pub jwt_secret: Option, + pub read_only: bool, } impl EnvConfig { @@ -40,6 +41,7 @@ impl EnvConfig { embucket_version: env_or_default("EMBUCKET_VERSION", "0.1.0"), metastore_config: env::var("METASTORE_CONFIG").ok().map(PathBuf::from), jwt_secret: env::var("JWT_SECRET").ok(), + read_only: parse_env("READ_ONLY").unwrap_or(true), } } @@ -56,6 +58,7 @@ impl EnvConfig { mem_enable_track_consumers_pool: self.mem_enable_track_consumers_pool, disk_pool_size_mb: self.disk_pool_size_mb, query_history_rows_limit: self.query_history_rows_limit, + read_only: self.read_only, } } } diff --git a/crates/embucket-lambda/src/main.rs b/crates/embucket-lambda/src/main.rs index 22e9a7ab..402c0f59 100644 --- a/crates/embucket-lambda/src/main.rs +++ b/crates/embucket-lambda/src/main.rs @@ -48,6 +48,7 @@ async fn main() -> Result<(), LambdaError> { mem_pool_size_mb = ?env_config.mem_pool_size_mb, disk_pool_size_mb = ?env_config.disk_pool_size_mb, bootstrap_default_entities = env_config.bootstrap_default_entities, + read_only = env_config.read_only, metastore_config = env_config.metastore_config.as_ref().map(|p| p.display().to_string()), "Loaded Lambda configuration" ); diff --git a/crates/embucketd/src/cli.rs b/crates/embucketd/src/cli.rs index 4c2e19cd..2f674117 100644 --- a/crates/embucketd/src/cli.rs +++ b/crates/embucketd/src/cli.rs @@ -169,6 +169,14 @@ pub struct CliOpts { help = "JWT secret for auth" )] jwt_secret: Option, + + #[arg( + long, + env = "READ_ONLY", + default_value = "true", + help = "If the service should only accept read only commands (selects)" + )] + pub read_only: bool, } impl CliOpts { diff --git a/crates/embucketd/src/main.rs b/crates/embucketd/src/main.rs index 1fc4eb67..f8664444 100644 --- a/crates/embucketd/src/main.rs +++ b/crates/embucketd/src/main.rs @@ -145,6 +145,7 @@ async fn async_main( mem_enable_track_consumers_pool: opts.mem_enable_track_consumers_pool, disk_pool_size_mb: opts.disk_pool_size_mb, query_history_rows_limit: opts.query_history_rows_limit, + read_only: opts.read_only, }; let host = opts.host.clone().unwrap(); let port = opts.port.unwrap(); diff --git a/crates/executor/src/error.rs b/crates/executor/src/error.rs index a6f628e1..223dcb6a 100644 --- a/crates/executor/src/error.rs +++ b/crates/executor/src/error.rs @@ -632,6 +632,13 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + + #[snafu(display("Statement not supported in read_only mode: {statement}"))] + NotSupportedStatementInReadOnlyMode { + statement: String, + #[snafu(implicit)] + location: Location, + }, } impl Error { diff --git a/crates/executor/src/query.rs b/crates/executor/src/query.rs index 2d70cb3a..b64d6da3 100644 --- a/crates/executor/src/query.rs +++ b/crates/executor/src/query.rs @@ -279,6 +279,50 @@ impl UserQuery { // 3. Single place to construct Logical plan from this AST // 4. Single place to rewrite-optimize-adjust logical plan // etc + if self.session.config.read_only { + match statement { + DFStatement::Statement(s) => match *s { + Statement::Query(subquery) => { + return self.execute_query_statement(subquery).await; + } + Statement::Use(entity) => { + return self.execute_use_statement(entity).await; + } + other => { + return ex_error::NotSupportedStatementInReadOnlyModeSnafu { + statement: other.to_string(), + } + .fail(); + } + }, + DFStatement::Explain(explain) => match *explain.statement { + DFStatement::Statement(s) => match *s { + Statement::Query(..) | Statement::Use(..) => { + return self.execute_sql(&self.query).await; + } + other => { + return ex_error::NotSupportedStatementInReadOnlyModeSnafu { + statement: other.to_string(), + } + .fail(); + } + }, + other => { + return ex_error::NotSupportedStatementInReadOnlyModeSnafu { + statement: other.to_string(), + } + .fail(); + } + }, + other => { + return ex_error::NotSupportedStatementInReadOnlyModeSnafu { + statement: other.to_string(), + } + .fail(); + } + } + } + if let DFStatement::Statement(s) = statement { match *s { Statement::AlterSession { @@ -300,29 +344,7 @@ impl UserQuery { return self.status_response(); } Statement::Use(entity) => { - let (variable, value) = match entity { - Use::Catalog(n) => ("catalog", n.to_string()), - Use::Schema(n) => ("schema", n.to_string()), - Use::Database(n) => ("database", n.to_string()), - Use::Warehouse(n) => ("warehouse", n.to_string()), - Use::Role(n) => ("role", n.to_string()), - Use::Object(n) => ("object", n.to_string()), - Use::SecondaryRoles(sr) => ("secondary_roles", sr.to_string()), - Use::Default => ("", String::new()), - }; - if variable.is_empty() | value.is_empty() { - return ex_error::OnyUseWithVariablesSnafu.fail(); - } - let params = HashMap::from([( - variable.to_string(), - SessionProperty::from_str_value( - variable.to_string(), - value, - Some(self.session.ctx.session_id()), - ), - )]); - self.session.set_session_variable(true, params)?; - return self.status_response(); + return self.execute_use_statement(entity).await; } Statement::Set(statement) => { use datafusion::sql::sqlparser::ast::Set; @@ -397,9 +419,8 @@ impl UserQuery { Statement::Truncate { table_names, .. } => { return Box::pin(self.truncate_table(table_names)).await; } - Statement::Query(mut subquery) => { - self.traverse_and_update_query(subquery.as_mut()).await; - return Box::pin(self.execute_with_custom_plan(&subquery.to_string())).await; + Statement::Query(subquery) => { + return self.execute_query_statement(subquery).await; } Statement::Drop { .. } => return Box::pin(self.drop_query(*s)).await, Statement::Merge { .. } => return Box::pin(self.merge_query(*s)).await, @@ -418,6 +439,60 @@ impl UserQuery { self.execute_sql(&self.query).await } + #[instrument( + name = "UserQuery::execute_query_statement", + level = "debug", + skip(self), + fields( + statement, + query_id = self.query_context.query_id.to_string(), + ), + err + )] + pub async fn execute_query_statement( + &mut self, + mut subquery: Box, + ) -> Result { + self.traverse_and_update_query(subquery.as_mut()).await; + Box::pin(self.execute_with_custom_plan(&subquery.to_string())).await + } + + #[instrument( + name = "UserQuery::execute_use_statement", + level = "debug", + skip(self), + fields( + statement, + query_id = self.query_context.query_id.to_string(), + ), + err + )] + pub async fn execute_use_statement(&mut self, entity: Use) -> Result { + let (variable, value) = match entity { + Use::Catalog(n) => ("catalog", n.to_string()), + Use::Schema(n) => ("schema", n.to_string()), + Use::Database(n) => ("database", n.to_string()), + Use::Warehouse(n) => ("warehouse", n.to_string()), + Use::Role(n) => ("role", n.to_string()), + Use::Object(n) => ("object", n.to_string()), + Use::SecondaryRoles(sr) => ("secondary_roles", sr.to_string()), + Use::Default => ("", String::new()), + }; + if variable.is_empty() | value.is_empty() { + return ex_error::OnyUseWithVariablesSnafu.fail(); + } + let params = HashMap::from([( + variable.to_string(), + SessionProperty::from_str_value( + variable.to_string(), + value, + Some(self.session.ctx.session_id()), + ), + )]); + self.session.set_session_variable(true, params)?; + self.status_response() + } + #[instrument(name = "UserQuery::get_catalog", level = "trace", skip(self), err)] pub fn get_catalog(&self, name: &str) -> Result> { self.session diff --git a/crates/executor/src/tests/service.rs b/crates/executor/src/tests/service.rs index 2f461d09..7658ca15 100644 --- a/crates/executor/src/tests/service.rs +++ b/crates/executor/src/tests/service.rs @@ -432,3 +432,134 @@ async fn test_query_timeout() { "Expected query execution exceeded timeout error but got {res:?}" ); } + +#[tokio::test] +#[allow(clippy::expect_used)] +async fn test_execute_read_only_mode() { + //setup + let metastore = Arc::new(InMemoryMetastore::new()); + let execution_svc = CoreExecutionService::new(metastore.clone(), Arc::new(Config::default())) + .await + .expect("Failed to create execution service"); + + execution_svc + .create_session("test_session_id") + .await + .expect("Failed to create session"); + + execution_svc + .query( + "test_session_id", + "CREATE OR REPLACE TABLE fetch_test(c1 INT)", + QueryContext::default(), + ) + .await + .expect("Failed to execute query"); + + execution_svc + .query( + "test_session_id", + "INSERT INTO fetch_test VALUES (1),(2),(3),(4)", + QueryContext::default(), + ) + .await + .expect("Failed to execute query"); + + drop(execution_svc); + + //read only mode test + let execution_svc = + CoreExecutionService::new(metastore, Arc::new(Config::default().with_read_only(true))) + .await + .expect("Failed to create execution service"); + + execution_svc + .create_session("test_session_id") + .await + .expect("Failed to create session"); + + //should fail + execution_svc + .query( + "test_session_id", + "CREATE OR REPLACE TABLE fetch_test(c1 INT)", + QueryContext::default(), + ) + .await + .expect_err("Read only mode failed"); + + //should fail + execution_svc + .query( + "test_session_id", + "INSERT INTO fetch_test VALUES (1),(2),(3),(4)", + QueryContext::default(), + ) + .await + .expect_err("Read only mode failed"); + + execution_svc + .query("test_session_id", "SELECT 1", QueryContext::default()) + .await + .expect("Failed to execute query in read only mode"); + + execution_svc + .query( + "test_session_id", + "EXPLAIN SELECT 1", + QueryContext::default(), + ) + .await + .expect("Failed to execute query in read only mode"); + + execution_svc + .query( + "test_session_id", + "WITH limited_data AS ( + SELECT c1 FROM fetch_test ORDER BY c1 FETCH FIRST 3 ROWS + ) + SELECT * FROM limited_data ORDER BY c1;", + QueryContext::default(), + ) + .await + .expect("Failed to execute query in read only mode"); + + execution_svc + .query( + "test_session_id", + "EXPLAIN WITH limited_data AS ( + SELECT c1 FROM fetch_test ORDER BY c1 FETCH FIRST 3 ROWS + ) + SELECT * FROM limited_data ORDER BY c1;", + QueryContext::default(), + ) + .await + .expect("Failed to execute query in read only mode"); + + execution_svc + .query( + "test_session_id", + "SELECT 1 UNION ALL SELECT c1 FROM fetch_test;", + QueryContext::default(), + ) + .await + .expect("Failed to execute query in read only mode"); + + execution_svc + .query( + "test_session_id", + "EXPLAIN SELECT 1 UNION ALL SELECT c1 FROM fetch_test;", + QueryContext::default(), + ) + .await + .expect("Failed to execute query in read only mode"); + + execution_svc + .query( + "test_session_id", + "USE SCHEMA public;", + QueryContext::default(), + ) + .await + .expect("Failed to execute query in read only mode"); +} diff --git a/crates/executor/src/utils.rs b/crates/executor/src/utils.rs index c25e952b..b6f9f270 100644 --- a/crates/executor/src/utils.rs +++ b/crates/executor/src/utils.rs @@ -43,6 +43,7 @@ pub struct Config { pub mem_enable_track_consumers_pool: Option, pub disk_pool_size_mb: Option, pub query_history_rows_limit: usize, + pub read_only: bool, } impl Default for Config { @@ -58,6 +59,7 @@ impl Default for Config { mem_enable_track_consumers_pool: None, disk_pool_size_mb: None, query_history_rows_limit: DEFAULT_QUERY_HISTORY_ROWS_LIMIT, + read_only: false, } } } @@ -89,6 +91,12 @@ impl Config { self.query_history_rows_limit = limit; self } + + #[must_use] + pub const fn with_read_only(mut self, read_only: bool) -> Self { + self.read_only = read_only; + self + } } #[derive(Copy, Clone, PartialEq, Eq, EnumString, Debug, Display, Default)]