Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adapter: Restrict the queries that can run on mz_introspection #18312

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 12 additions & 3 deletions misc/dbt-materialize/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# dbt-materialize Changelog

## Unreleased

* **Breaking change.** Automatically run introspection queries in the
`mz_introspection` cluster via the new `auto_route_introspection_queries`
session variable, instead of hardcoding the cluster on connection.

This change requires [Materialize >=0.49.0](https://materialize.com/docs/releases/v0.49/).
**Users of older versions should pin `dbt-materialize` to `v1.4.0`.**

## 1.4.0 - 2023-02-03

* Upgrade to `dbt-postgres` v1.4.0.
Expand Down Expand Up @@ -55,8 +64,8 @@

## 1.2.0 - 2022-08-31

* Enable additional configuration for indexes created on view,
materializedview, or source materializations. Fix to use Materialize's
* Enable additional configuration for indexes created on `view`,
`materializedview`, or `source` materializations. Fix to use Materialize's
internal naming convention when creating indexes without providing
explicit names.

Expand Down Expand Up @@ -92,7 +101,7 @@
```

* A new `cluster` option for indexes on view, materializedview, or source
materializations. If 'cluster' is not supplied, indexes will be created
materializations. If `cluster` is not supplied, indexes will be created
in the cluster used to create the materialization.

```sql
Expand Down
2 changes: 1 addition & 1 deletion misc/dbt-materialize/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ pip install dbt-materialize # install the adapter
## Requirements

<!-- If you update this, bump the constraint in connections.py too. -->
`dbt-materialize` requires Materialize v0.28.0+.
`dbt-materialize` requires Materialize v0.49.0+.

## Configuring your profile

Expand Down
20 changes: 10 additions & 10 deletions misc/dbt-materialize/dbt/adapters/materialize/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
from dbt.semver import versions_compatible

# If you bump this version, bump it in README.md too.
SUPPORTED_MATERIALIZE_VERSIONS = ">=0.28.0"
SUPPORTED_MATERIALIZE_VERSIONS = ">=0.49.0"

logger = AdapterLogger("Materialize")

Expand Down Expand Up @@ -64,22 +64,22 @@ def open(cls, connection):

cursor = connection.handle.cursor()

# Upon connection, dbt performs introspection queries that should run in
# the mz_introspection cluster for optimal performance. Each materialization
# should then handle falling back to the default connection cluster if no
# cluster configuration is specified at the model level.
mz_introspection_cluster = "mz_introspection"
logger.debug("Switching to cluster '{}'".format(mz_introspection_cluster))
cursor.execute("SET cluster = %s" % mz_introspection_cluster)

cursor.execute("SELECT mz_version()")
# Check for the current DB version using the "mz_introspection" cluster in case "default"
# doesn't exist.
cursor.execute("SHOW mz_version")
mz_version = cursor.fetchone()[0].split()[0].strip("v")

if not versions_compatible(mz_version, SUPPORTED_MATERIALIZE_VERSIONS):
raise dbt.exceptions.DbtRuntimeError(
f"Detected unsupported Materialize version {mz_version}\n"
f" Supported versions: {SUPPORTED_MATERIALIZE_VERSIONS}"
)

# Make sure 'auto_route_introspection_queries' is enabled.
var_name = "auto_route_introspection_queries"
logger.debug(f"Enabling {var_name}")
cursor.execute(f"SET {var_name} = true")

return connection

# Disable transactions. Materialize transactions do not support arbitrary
Expand Down
8 changes: 7 additions & 1 deletion misc/dbt-materialize/mzcompose.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
parser.add_argument(
"filter", nargs="?", default="", help="limit to test cases matching filter"
)
parser.add_argument(
"-k", nargs="?", default=None, help="limit tests by keyword expressions"
)
args = parser.parse_args()

for test_case in test_cases:
Expand All @@ -63,6 +66,9 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
image=test_case.materialized_image,
volumes_extra=["secrets:/secrets"],
)
test_args = ["dbt-materialize/tests"]
if args.k:
test_args.append(f"-k {args.k}")

with c.test_case(test_case.name):
with c.override(materialized):
Expand All @@ -72,7 +78,7 @@ def workflow_default(c: Composition, parser: WorkflowArgumentParser) -> None:
c.run(
"dbt-test",
"pytest",
"dbt-materialize/tests",
*test_args,
env_extra={
"DBT_HOST": "materialized",
"KAFKA_ADDR": "redpanda:9092",
Expand Down
1 change: 1 addition & 0 deletions src/adapter/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ reqwest = "0.11.13"
semver = "1.0.16"
serde = "1.0.152"
serde_json = "1.0.89"
smallvec = { version = "1.10.0", features = ["union"] }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false, features = ["bincode"] }
tokio = { version = "1.24.2", features = ["rt", "time"] }
tokio-postgres = { git = "https://github.com/MaterializeInc/rust-postgres" }
Expand Down
45 changes: 45 additions & 0 deletions src/adapter/src/coord/introspection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
use mz_repr::GlobalId;
use mz_sql::catalog::SessionCatalog;
use mz_sql::plan::Plan;
use smallvec::SmallVec;

use crate::catalog::{Catalog, Cluster};
use crate::notice::AdapterNotice;
Expand Down Expand Up @@ -80,6 +81,50 @@ pub fn auto_run_on_introspection<'a, 's>(
}
}

/// Checks if we're currently running on the [`MZ_INTROSPECTION_CLUSTER`], and if so, do
/// we depend on any objects that we're not allowed to query from the cluster.
pub fn check_cluster_restrictions(
catalog: &impl SessionCatalog,
plan: &Plan,
depends_on: &Vec<GlobalId>,
) -> Result<(), AdapterError> {
// We only impose restrictions if the current cluster is the introspection cluster.
let cluster = catalog.active_cluster();
if cluster != MZ_INTROSPECTION_CLUSTER.name {
return Ok(());
}

// Allows explain queries.
if let Plan::Explain(_) = plan {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor concern: I'm not convinced this is helpful? We'd let them run an explain and then when they do a real query it'd error? Also fine leaving as is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I opted to allow all explain queries was because I pictured a flow where we reject a query because it references non-system tables, and I wanted the user to be able to better understand why we rejected it, i.e. use EXPLAIN, without having to switch clusters. It's a minor detail and I'm totally fine with removing it if we think it'll cause more confusion because of the inconsistency

return Ok(());
}

// Collect any items that are not allowed to be run on the introspection cluster.
let unallowed_dependents: SmallVec<[String; 2]> = depends_on
.iter()
.filter_map(|id| {
let item = catalog.get_item(id);
let full_name = catalog.resolve_full_name(item.name());

if !catalog.is_system_schema(&full_name.schema) {
Some(full_name.to_string())
} else {
None
}
})
.collect();

// If the query depends on unallowed items, error out.
if !unallowed_dependents.is_empty() {
Err(AdapterError::UnallowedOnCluster {
depends_on: unallowed_dependents,
cluster: MZ_INTROSPECTION_CLUSTER.name.to_string(),
})
} else {
Ok(())
}
}

/// TODO(jkosh44) This function will verify the privileges for the mz_introspection user.
/// All of the privileges are hard coded into this function. In the future if we ever add
/// a more robust privileges framework, then this function should be replaced with that
Expand Down
5 changes: 5 additions & 0 deletions src/adapter/src/coord/sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ impl Coordinator {
{
return tx.send(Err(e), session);
}
if let Err(e) =
introspection::check_cluster_restrictions(&session_catalog, &plan, &depends_on)
{
return tx.send(Err(e), session);
}

match plan {
Plan::CreateSource(plan) => {
Expand Down
21 changes: 21 additions & 0 deletions src/adapter/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::num::TryFromIntError;
use dec::TryFromDecimalError;
use itertools::Itertools;
use mz_repr::adt::timestamp::TimestampError;
use smallvec::SmallVec;
use tokio::sync::oneshot;

use mz_compute_client::controller::error as compute_error;
Expand Down Expand Up @@ -145,6 +146,11 @@ pub enum AdapterError {
SubscribeOnlyTransaction,
/// An error occurred in the MIR stage of the optimizer.
Transform(TransformError),
/// A query depends on items which are not allowed to be referenced from the current cluster.
UnallowedOnCluster {
depends_on: SmallVec<[String; 2]>,
cluster: String,
},
/// A user tried to perform an action that they were unauthorized to do.
Unauthorized(rbac::UnauthorizedError),
/// The specified function cannot be called
Expand Down Expand Up @@ -318,6 +324,10 @@ impl AdapterError {
),
AdapterError::PlanError(e) => e.hint(),
AdapterError::VarError(e) => e.hint(),
AdapterError::UnallowedOnCluster { .. } => Some(
"Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query."
.into(),
),
_ => None,
}
}
Expand Down Expand Up @@ -443,6 +453,17 @@ impl fmt::Display for AdapterError {
AdapterError::UncallableFunction { func, context } => {
write!(f, "cannot call {} in {}", func, context)
}
AdapterError::UnallowedOnCluster {
depends_on,
cluster,
} => {
let items = depends_on.into_iter().map(|item| item.quoted()).join(", ");
write!(
f,
"querying the following items {items} is not allowed from the {} cluster",
cluster.quoted()
)
}
AdapterError::Unauthorized(unauthorized) => {
write!(f, "{unauthorized}")
}
Expand Down
3 changes: 3 additions & 0 deletions src/pgwire/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ impl ErrorResponse {
AdapterError::SqlCatalog(_) => SqlState::INTERNAL_ERROR,
AdapterError::SubscribeOnlyTransaction => SqlState::INVALID_TRANSACTION_STATE,
AdapterError::Transform(_) => SqlState::INTERNAL_ERROR,
AdapterError::UnallowedOnCluster { .. } => {
SqlState::S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED
}
Comment on lines +365 to +367
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I know I suggested this one, but right after I realized that this is not correct either. These are meant for SQL routines (https://www.postgresql.org/docs/current/infoschema-routines.html), which are functions and procedures.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No worries! Looking through all of the error codes the following seem to be somewhat decent:

  • protocol_violation - Class 08 — Connection Exception
  • invalid_parameter_value - Class 22 — Data Exception (not sure what classifies as a "parameter" though)
  • system_error - Class 58 — System Error (errors external to PostgreSQL itself)

system_error seems to be the most generic, so that's what I'm leaning towards, what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, invalid_param_value wouldn't work because param refers to parameterized queries like SELECT * FROM $1.
I don't think system_error makes sense either because the error is not external to Materialize. protocol_violation refers to the pgwire protocol, which this query isn't in violation of.

I actually think the privilege error is the best one the more I think about it. There's nothing inherently wrong with the query, we have just made it forbidden when using a specific cluster.

Sorry to be nit-picky on this.

AdapterError::Unauthorized(_) => SqlState::INSUFFICIENT_PRIVILEGE,
AdapterError::UncallableFunction { .. } => SqlState::FEATURE_NOT_SUPPORTED,
AdapterError::UnknownCursor(_) => SqlState::INVALID_CURSOR_NAME,
Expand Down
12 changes: 8 additions & 4 deletions test/sqllogictest/materialized_views.slt
Original file line number Diff line number Diff line change
Expand Up @@ -258,11 +258,15 @@ materialize.public.mv CREATE␠MATERIALIZED␠VIEW␠"materialize"."public"."mv"

# Test: SHOW CREATE MATERIALIZED VIEW as mz_introspection

simple conn=mz_introspection,user=mz_introspection

statement ok
SET cluster = mz_introspection

statement error querying the following items "materialize\.public\.mv" is not allowed from the "mz_introspection" cluster\nHINT: Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query.
SHOW CREATE MATERIALIZED VIEW mv
----
materialize.public.mv,CREATE MATERIALIZED VIEW "materialize"."public"."mv" IN CLUSTER "default" AS SELECT 1
COMPLETE 1

statement ok
RESET cluster

# Test: SHOW MATERIALIZED VIEWS

Expand Down
14 changes: 10 additions & 4 deletions test/sqllogictest/operator.slt
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,17 @@ SHOW CREATE VIEW ADDER
materialize.public.adder
CREATE VIEW "materialize"."public"."adder" AS SELECT 2 + 2

simple conn=mz_introspection,user=mz_introspection

# Test: Show user view on mz_introspection

statement ok
SET cluster = mz_introspection

statement error querying the following items "materialize\.public\.adder" is not allowed from the "mz_introspection" cluster\nHINT: Use `SET CLUSTER = <cluster-name>` to change your cluster and re-run the query.
SHOW CREATE VIEW ADDER
----
materialize.public.adder,CREATE VIEW "materialize"."public"."adder" AS SELECT 2 + 2
COMPLETE 1

statement ok
RESET cluster

statement ok
CREATE VIEW MULTIPLIER AS SELECT 2 OPERATOR(*) 2;
Expand Down
Loading