Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/embucket-lambda/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub struct EnvConfig {
pub embucket_version: String,
pub metastore_config: Option<PathBuf>,
pub jwt_secret: Option<String>,
pub read_only: bool,
}

impl EnvConfig {
Expand All @@ -40,6 +41,7 @@ impl EnvConfig {
embucket_version: env_or_default("EMBUCKET_VERSION", "0.1.0"),
metastore_config: env::var("METASTORE_CONFIG").ok().map(PathBuf::from),
jwt_secret: env::var("JWT_SECRET").ok(),
read_only: parse_env("READ_ONLY").unwrap_or(true),
}
}

Expand All @@ -56,6 +58,7 @@ impl EnvConfig {
mem_enable_track_consumers_pool: self.mem_enable_track_consumers_pool,
disk_pool_size_mb: self.disk_pool_size_mb,
query_history_rows_limit: self.query_history_rows_limit,
read_only: self.read_only,
}
}
}
Expand Down
1 change: 1 addition & 0 deletions crates/embucket-lambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ async fn main() -> Result<(), LambdaError> {
mem_pool_size_mb = ?env_config.mem_pool_size_mb,
disk_pool_size_mb = ?env_config.disk_pool_size_mb,
bootstrap_default_entities = env_config.bootstrap_default_entities,
read_only = env_config.read_only,
metastore_config = env_config.metastore_config.as_ref().map(|p| p.display().to_string()),
"Loaded Lambda configuration"
);
Expand Down
8 changes: 8 additions & 0 deletions crates/embucketd/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,14 @@ pub struct CliOpts {
help = "JWT secret for auth"
)]
jwt_secret: Option<String>,

#[arg(
long,
env = "READ_ONLY",
default_value = "true",
help = "If the service should only accept read only commands (selects)"
)]
pub read_only: bool,
}

impl CliOpts {
Expand Down
1 change: 1 addition & 0 deletions crates/embucketd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ async fn async_main(
mem_enable_track_consumers_pool: opts.mem_enable_track_consumers_pool,
disk_pool_size_mb: opts.disk_pool_size_mb,
query_history_rows_limit: opts.query_history_rows_limit,
read_only: opts.read_only,
};
let host = opts.host.clone().unwrap();
let port = opts.port.unwrap();
Expand Down
7 changes: 7 additions & 0 deletions crates/executor/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -632,6 +632,13 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},

#[snafu(display("Statement not supported in read_only mode: {statement}"))]
NotSupportedStatementInReadOnlyMode {
statement: String,
#[snafu(implicit)]
location: Location,
},
}

impl Error {
Expand Down
127 changes: 101 additions & 26 deletions crates/executor/src/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,50 @@ impl UserQuery {
// 3. Single place to construct Logical plan from this AST
// 4. Single place to rewrite-optimize-adjust logical plan
// etc
if self.session.config.read_only {
Comment thread
DanCodedThis marked this conversation as resolved.
match statement {
DFStatement::Statement(s) => match *s {
Statement::Query(subquery) => {
return self.execute_query_statement(subquery).await;
}
Statement::Use(entity) => {
return self.execute_use_statement(entity).await;
}
other => {
return ex_error::NotSupportedStatementInReadOnlyModeSnafu {
statement: other.to_string(),
}
.fail();
}
},
DFStatement::Explain(explain) => match *explain.statement {
DFStatement::Statement(s) => match *s {
Statement::Query(..) | Statement::Use(..) => {
return self.execute_sql(&self.query).await;
}
other => {
return ex_error::NotSupportedStatementInReadOnlyModeSnafu {
statement: other.to_string(),
}
.fail();
}
},
other => {
return ex_error::NotSupportedStatementInReadOnlyModeSnafu {
statement: other.to_string(),
}
.fail();
}
},
other => {
return ex_error::NotSupportedStatementInReadOnlyModeSnafu {
statement: other.to_string(),
}
.fail();
}
}
}

if let DFStatement::Statement(s) = statement {
match *s {
Statement::AlterSession {
Expand All @@ -300,29 +344,7 @@ impl UserQuery {
return self.status_response();
}
Statement::Use(entity) => {
let (variable, value) = match entity {
Use::Catalog(n) => ("catalog", n.to_string()),
Use::Schema(n) => ("schema", n.to_string()),
Use::Database(n) => ("database", n.to_string()),
Use::Warehouse(n) => ("warehouse", n.to_string()),
Use::Role(n) => ("role", n.to_string()),
Use::Object(n) => ("object", n.to_string()),
Use::SecondaryRoles(sr) => ("secondary_roles", sr.to_string()),
Use::Default => ("", String::new()),
};
if variable.is_empty() | value.is_empty() {
return ex_error::OnyUseWithVariablesSnafu.fail();
}
let params = HashMap::from([(
variable.to_string(),
SessionProperty::from_str_value(
variable.to_string(),
value,
Some(self.session.ctx.session_id()),
),
)]);
self.session.set_session_variable(true, params)?;
return self.status_response();
return self.execute_use_statement(entity).await;
}
Statement::Set(statement) => {
use datafusion::sql::sqlparser::ast::Set;
Expand Down Expand Up @@ -397,9 +419,8 @@ impl UserQuery {
Statement::Truncate { table_names, .. } => {
return Box::pin(self.truncate_table(table_names)).await;
}
Statement::Query(mut subquery) => {
self.traverse_and_update_query(subquery.as_mut()).await;
return Box::pin(self.execute_with_custom_plan(&subquery.to_string())).await;
Statement::Query(subquery) => {
return self.execute_query_statement(subquery).await;
}
Statement::Drop { .. } => return Box::pin(self.drop_query(*s)).await,
Statement::Merge { .. } => return Box::pin(self.merge_query(*s)).await,
Expand All @@ -418,6 +439,60 @@ impl UserQuery {
self.execute_sql(&self.query).await
}

#[instrument(
name = "UserQuery::execute_query_statement",
level = "debug",
skip(self),
fields(
statement,
query_id = self.query_context.query_id.to_string(),
),
err
)]
pub async fn execute_query_statement(
&mut self,
mut subquery: Box<Query>,
) -> Result<QueryResult> {
self.traverse_and_update_query(subquery.as_mut()).await;
Box::pin(self.execute_with_custom_plan(&subquery.to_string())).await
}

#[instrument(
name = "UserQuery::execute_use_statement",
level = "debug",
skip(self),
fields(
statement,
query_id = self.query_context.query_id.to_string(),
),
err
)]
pub async fn execute_use_statement(&mut self, entity: Use) -> Result<QueryResult> {
let (variable, value) = match entity {
Use::Catalog(n) => ("catalog", n.to_string()),
Use::Schema(n) => ("schema", n.to_string()),
Use::Database(n) => ("database", n.to_string()),
Use::Warehouse(n) => ("warehouse", n.to_string()),
Use::Role(n) => ("role", n.to_string()),
Use::Object(n) => ("object", n.to_string()),
Use::SecondaryRoles(sr) => ("secondary_roles", sr.to_string()),
Use::Default => ("", String::new()),
};
if variable.is_empty() | value.is_empty() {
return ex_error::OnyUseWithVariablesSnafu.fail();
}
let params = HashMap::from([(
variable.to_string(),
SessionProperty::from_str_value(
variable.to_string(),
value,
Some(self.session.ctx.session_id()),
),
)]);
self.session.set_session_variable(true, params)?;
self.status_response()
}

#[instrument(name = "UserQuery::get_catalog", level = "trace", skip(self), err)]
pub fn get_catalog(&self, name: &str) -> Result<Arc<dyn CatalogProvider>> {
self.session
Expand Down
131 changes: 131 additions & 0 deletions crates/executor/src/tests/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,134 @@ async fn test_query_timeout() {
"Expected query execution exceeded timeout error but got {res:?}"
);
}

#[tokio::test]
#[allow(clippy::expect_used)]
async fn test_execute_read_only_mode() {
//setup
let metastore = Arc::new(InMemoryMetastore::new());
let execution_svc = CoreExecutionService::new(metastore.clone(), Arc::new(Config::default()))
.await
.expect("Failed to create execution service");

execution_svc
.create_session("test_session_id")
.await
.expect("Failed to create session");

execution_svc
.query(
"test_session_id",
"CREATE OR REPLACE TABLE fetch_test(c1 INT)",
QueryContext::default(),
)
.await
.expect("Failed to execute query");

execution_svc
.query(
"test_session_id",
"INSERT INTO fetch_test VALUES (1),(2),(3),(4)",
QueryContext::default(),
)
.await
.expect("Failed to execute query");

drop(execution_svc);

//read only mode test
let execution_svc =
CoreExecutionService::new(metastore, Arc::new(Config::default().with_read_only(true)))
.await
.expect("Failed to create execution service");

execution_svc
.create_session("test_session_id")
.await
.expect("Failed to create session");

//should fail
execution_svc
.query(
"test_session_id",
"CREATE OR REPLACE TABLE fetch_test(c1 INT)",
QueryContext::default(),
)
.await
.expect_err("Read only mode failed");

//should fail
execution_svc
.query(
"test_session_id",
"INSERT INTO fetch_test VALUES (1),(2),(3),(4)",
QueryContext::default(),
)
.await
.expect_err("Read only mode failed");

execution_svc
.query("test_session_id", "SELECT 1", QueryContext::default())
.await
.expect("Failed to execute query in read only mode");

execution_svc
.query(
"test_session_id",
"EXPLAIN SELECT 1",
QueryContext::default(),
)
.await
.expect("Failed to execute query in read only mode");

execution_svc
.query(
"test_session_id",
"WITH limited_data AS (
SELECT c1 FROM fetch_test ORDER BY c1 FETCH FIRST 3 ROWS
)
SELECT * FROM limited_data ORDER BY c1;",
QueryContext::default(),
)
.await
.expect("Failed to execute query in read only mode");

execution_svc
.query(
"test_session_id",
"EXPLAIN WITH limited_data AS (
SELECT c1 FROM fetch_test ORDER BY c1 FETCH FIRST 3 ROWS
)
SELECT * FROM limited_data ORDER BY c1;",
QueryContext::default(),
)
.await
.expect("Failed to execute query in read only mode");

execution_svc
.query(
"test_session_id",
"SELECT 1 UNION ALL SELECT c1 FROM fetch_test;",
QueryContext::default(),
)
.await
.expect("Failed to execute query in read only mode");

execution_svc
.query(
"test_session_id",
"EXPLAIN SELECT 1 UNION ALL SELECT c1 FROM fetch_test;",
QueryContext::default(),
)
.await
.expect("Failed to execute query in read only mode");

execution_svc
.query(
"test_session_id",
"USE SCHEMA public;",
QueryContext::default(),
)
.await
.expect("Failed to execute query in read only mode");
}
8 changes: 8 additions & 0 deletions crates/executor/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct Config {
pub mem_enable_track_consumers_pool: Option<bool>,
pub disk_pool_size_mb: Option<usize>,
pub query_history_rows_limit: usize,
pub read_only: bool,
}

impl Default for Config {
Expand All @@ -58,6 +59,7 @@ impl Default for Config {
mem_enable_track_consumers_pool: None,
disk_pool_size_mb: None,
query_history_rows_limit: DEFAULT_QUERY_HISTORY_ROWS_LIMIT,
read_only: false,
}
}
}
Expand Down Expand Up @@ -89,6 +91,12 @@ impl Config {
self.query_history_rows_limit = limit;
self
}

#[must_use]
pub const fn with_read_only(mut self, read_only: bool) -> Self {
self.read_only = read_only;
self
}
}

#[derive(Copy, Clone, PartialEq, Eq, EnumString, Debug, Display, Default)]
Expand Down