Skip to content

Commit

Permalink
sql: Ensure priv role refs are always valid
Browse files Browse the repository at this point in the history
This commit prevents dropping a role, if that role is a grantee or
grantor for some object privilege. This helps ensure that the grantee
and grantor references in the catalog always remain valid.

Part of #11579
  • Loading branch information
jkosh44 committed Apr 16, 2023
1 parent 15fdc31 commit 776a586
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 18 deletions.
5 changes: 5 additions & 0 deletions src/adapter/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2424,6 +2424,11 @@ impl CatalogEntry {
pub fn owner_id(&self) -> &RoleId {
&self.owner_id
}

/// Returns the privileges of the entry.
pub fn privileges(&self) -> &Vec<MzAclItem> {
&self.privileges
}
}

struct AllocatedBuiltinSystemIds<T> {
Expand Down
79 changes: 69 additions & 10 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use mz_expr::{
use mz_ore::collections::CollectionExt;
use mz_ore::result::ResultExt as OreResultExt;
use mz_ore::task;
use mz_repr::adt::mz_acl_item::MzAclItem;
use mz_repr::explain::{ExplainFormat, Explainee};
use mz_repr::role_id::RoleId;
use mz_repr::{Datum, Diff, GlobalId, RelationDesc, Row, RowArena, Timestamp};
Expand Down Expand Up @@ -1394,7 +1395,7 @@ impl Coordinator {
let mut dropped_active_db = false;
let mut dropped_active_cluster = false;

let mut dropped_roles = plan
let mut dropped_roles: BTreeMap<_, _> = plan
.ids
.iter()
.filter_map(|id| match id {
Expand All @@ -1406,6 +1407,9 @@ impl Coordinator {
(*id, name)
})
.collect();
for role_id in dropped_roles.keys() {
self.catalog().ensure_not_reserved_role(&role_id)?;
}
self.validate_dropped_role_ownership(&dropped_roles)?;
// If any role is a member of a dropped role, then we must revoke that membership.
let dropped_role_ids: BTreeSet<_> = dropped_roles.keys().collect();
Expand Down Expand Up @@ -1478,50 +1482,105 @@ impl Coordinator {
&self,
dropped_roles: &BTreeMap<RoleId, &str>,
) -> Result<(), AdapterError> {
fn privilege_check(
privileges: &Vec<MzAclItem>,
dropped_roles: &BTreeMap<RoleId, &str>,
dependent_objects: &mut BTreeMap<String, Vec<String>>,
object_name: &str,
object_type: &str,
) {
for privilege in privileges {
if let Some(role_name) = dropped_roles.get(&privilege.grantee) {
dependent_objects
.entry(role_name.to_string())
.or_default()
.push(format!("privileges for {object_type} {object_name}",));
}
if let Some(role_name) = dropped_roles.get(&privilege.grantor) {
dependent_objects
.entry(role_name.to_string())
.or_default()
.push(format!("privileges for {object_type} {object_name}"));
}
}
}

let mut dependent_objects: BTreeMap<_, Vec<_>> = BTreeMap::new();
for entry in self.catalog.entries() {
if let Some(role_name) = dropped_roles.get(entry.owner_id()) {
dependent_objects
.entry(role_name.to_string())
.or_default()
.push(format!("{} {}", entry.item().typ(), entry.name().item));
}
.push(format!(
"owner of {} {}",
entry.item().typ(),
entry.name().item
));
}
privilege_check(
entry.privileges(),
dropped_roles,
&mut dependent_objects,
&entry.item().typ().to_string(),
&entry.name().item,
);
}
for database in self.catalog.databases() {
if let Some(role_name) = dropped_roles.get(&database.owner_id) {
dependent_objects
.entry(role_name.to_string())
.or_default()
.push(format!("database {}", database.name()));
}
.push(format!("owner of database {}", database.name()));
}
privilege_check(
&database.privileges,
dropped_roles,
&mut dependent_objects,
database.name(),
"database",
);
for schema in database.schemas_by_id.values() {
if let Some(role_name) = dropped_roles.get(&schema.owner_id) {
dependent_objects
.entry(role_name.to_string())
.or_default()
.push(format!("schema {}", schema.name().schema));
.push(format!("owner of schema {}", schema.name().schema));
}
privilege_check(
&schema.privileges,
dropped_roles,
&mut dependent_objects,
&schema.name().schema,
"schema",
);
}
}
for cluster in self.catalog.clusters() {
if let Some(role_name) = dropped_roles.get(&cluster.owner_id) {
dependent_objects
.entry(role_name.to_string())
.or_default()
.push(format!("cluster {}", cluster.name()));
}
.push(format!("owner of cluster {}", cluster.name()));
}
privilege_check(
&cluster.privileges,
dropped_roles,
&mut dependent_objects,
cluster.name(),
"cluster",
);
for replica in cluster.replicas_by_id.values() {
if let Some(role_name) = dropped_roles.get(&replica.owner_id) {
dependent_objects
.entry(role_name.to_string())
.or_default()
.push(format!("cluster replica {}", replica.name));
.push(format!("owner of cluster replica {}", replica.name));
}
}
}

if !dependent_objects.is_empty() {
Err(AdapterError::DependentObjectOwnership(dependent_objects))
Err(AdapterError::DependentObject(dependent_objects))
} else {
Ok(())
}
Expand Down
14 changes: 7 additions & 7 deletions src/adapter/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,10 +193,10 @@ pub enum AdapterError {
Orchestrator(anyhow::Error),
/// The active role was dropped while a user was logged in.
ConcurrentRoleDrop(RoleId),
/// A statement tried to drop a role that still owned one or more objects.
/// A statement tried to drop a role that had dependent objects.
///
/// The map keys are role names and values are owned object names.
DependentObjectOwnership(BTreeMap<String, Vec<String>>),
/// The map keys are role names and values are detailed error messages.
DependentObject(BTreeMap<String, Vec<String>>),
}

impl AdapterError {
Expand Down Expand Up @@ -253,12 +253,12 @@ impl AdapterError {
AdapterError::VarError(e) => e.detail(),
AdapterError::ConcurrentRoleDrop(_) => Some("Please disconnect and re-connect with a valid role.".into()),
AdapterError::Unauthorized(unauthorized) => unauthorized.detail(),
AdapterError::DependentObjectOwnership(dependent_objects) => {
AdapterError::DependentObject(dependent_objects) => {
Some(dependent_objects
.iter()
.map(|(role_name, object_names)| object_names
.map(|(role_name, err_msgs)| err_msgs
.iter()
.map(|object_name| format!("{role_name} is owner of {object_name}"))
.map(|err_msg| format!("{role_name}: {err_msg}"))
.join("\n"))
.join("\n"))
},
Expand Down Expand Up @@ -488,7 +488,7 @@ impl fmt::Display for AdapterError {
AdapterError::ConcurrentRoleDrop(role_id) => {
write!(f, "role {role_id} was concurrently dropped")
}
AdapterError::DependentObjectOwnership(dependent_objects) => {
AdapterError::DependentObject(dependent_objects) => {
let role_str = if dependent_objects.keys().count() == 1 {
"role"
} else {
Expand Down
2 changes: 1 addition & 1 deletion src/pgwire/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ impl ErrorResponse {
SqlState::INTERNAL_ERROR
}
AdapterError::ConcurrentRoleDrop(_) => SqlState::UNDEFINED_OBJECT,
AdapterError::DependentObjectOwnership(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST,
AdapterError::DependentObject(_) => SqlState::DEPENDENT_OBJECTS_STILL_EXIST,
AdapterError::VarError(e) => match e {
VarError::ConstrainedParameter { .. } => SqlState::INVALID_PARAMETER_VALUE,
VarError::FixedValueParameter(_) => SqlState::INVALID_PARAMETER_VALUE,
Expand Down

0 comments on commit 776a586

Please sign in to comment.