From 1d81b9962b7c97d54d152a823870876dd5421a0a Mon Sep 17 00:00:00 2001 From: Parker Timmerman Date: Wed, 12 Apr 2023 18:22:20 -0400 Subject: [PATCH 1/5] start - add namespace to DatabaseKey, SchemaKey, and ItemValue - add stash migration - update Builtin tables and view - update sqllogictests - update testdrive --- src/adapter/src/catalog.rs | 4 +- src/adapter/src/catalog/builtin.rs | 24 +- .../src/catalog/builtin_table_updates.rs | 26 +- src/adapter/src/catalog/storage.rs | 251 ++++++++++++++++-- src/sql/src/names.rs | 132 ++++++--- src/sql/src/plan/statement/show.rs | 28 +- src/stash/src/lib.rs | 27 ++ test/sqllogictest/audit_log.slt | 16 +- test/sqllogictest/id_reuse.slt | 40 +-- test/sqllogictest/secret.slt | 6 +- test/sqllogictest/source_sizing.slt | 8 +- test/sqllogictest/system-cluster.slt | 30 +-- test/testdrive/tables.td | 2 +- .../create-in-v0.27.0-view-on-builtin.td | 2 +- 14 files changed, 443 insertions(+), 153 deletions(-) diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 3efd410b6452..8b34ac437c8c 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -8030,8 +8030,8 @@ mod tests { item, name: QualifiedItemName { qualifiers: ItemQualifiers { - database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::new(1)), - schema_spec: SchemaSpecifier::Id(SchemaId::new(3)), + database_spec: ResolvedDatabaseSpecifier::Id(DatabaseId::User(1)), + schema_spec: SchemaSpecifier::Id(SchemaId::User(3)), }, item: "v".to_string(), }, diff --git a/src/adapter/src/catalog/builtin.rs b/src/adapter/src/catalog/builtin.rs index 42c11919956e..29b1ea8e158c 100644 --- a/src/adapter/src/catalog/builtin.rs +++ b/src/adapter/src/catalog/builtin.rs @@ -1383,7 +1383,7 @@ pub static MZ_DATABASES: Lazy = Lazy::new(|| BuiltinTable { name: "mz_databases", schema: MZ_CATALOG_SCHEMA, desc: RelationDesc::empty() - .with_column("id", ScalarType::UInt64.nullable(false)) + .with_column("id", ScalarType::String.nullable(false)) .with_column("oid", ScalarType::Oid.nullable(false)) .with_column("name", ScalarType::String.nullable(false)) .with_column("owner_id", ScalarType::String.nullable(false)), @@ -1393,9 +1393,9 @@ pub static MZ_SCHEMAS: Lazy = Lazy::new(|| BuiltinTable { name: "mz_schemas", schema: MZ_CATALOG_SCHEMA, desc: RelationDesc::empty() - .with_column("id", ScalarType::UInt64.nullable(false)) + .with_column("id", ScalarType::String.nullable(false)) .with_column("oid", ScalarType::Oid.nullable(false)) - .with_column("database_id", ScalarType::UInt64.nullable(true)) + .with_column("database_id", ScalarType::String.nullable(true)) .with_column("name", ScalarType::String.nullable(false)) .with_column("owner_id", ScalarType::String.nullable(false)), is_retained_metrics_object: false, @@ -1442,7 +1442,7 @@ pub static MZ_TABLES: Lazy = Lazy::new(|| BuiltinTable { desc: RelationDesc::empty() .with_column("id", ScalarType::String.nullable(false)) .with_column("oid", ScalarType::Oid.nullable(false)) - .with_column("schema_id", ScalarType::UInt64.nullable(false)) + .with_column("schema_id", ScalarType::String.nullable(false)) .with_column("name", ScalarType::String.nullable(false)) .with_column("owner_id", ScalarType::String.nullable(false)), is_retained_metrics_object: false, @@ -1453,7 +1453,7 @@ pub static MZ_CONNECTIONS: Lazy = Lazy::new(|| BuiltinTable { desc: RelationDesc::empty() .with_column("id", ScalarType::String.nullable(false)) .with_column("oid", ScalarType::Oid.nullable(false)) - .with_column("schema_id", ScalarType::UInt64.nullable(false)) + .with_column("schema_id", ScalarType::String.nullable(false)) .with_column("name", ScalarType::String.nullable(false)) .with_column("type", ScalarType::String.nullable(false)) .with_column("owner_id", ScalarType::String.nullable(false)), @@ -1474,7 +1474,7 @@ pub static MZ_SOURCES: Lazy = Lazy::new(|| BuiltinTable { desc: RelationDesc::empty() .with_column("id", ScalarType::String.nullable(false)) .with_column("oid", ScalarType::Oid.nullable(false)) - .with_column("schema_id", ScalarType::UInt64.nullable(false)) + .with_column("schema_id", ScalarType::String.nullable(false)) .with_column("name", ScalarType::String.nullable(false)) .with_column("type", ScalarType::String.nullable(false)) .with_column("connection_id", ScalarType::String.nullable(true)) @@ -1490,7 +1490,7 @@ pub static MZ_SINKS: Lazy = Lazy::new(|| BuiltinTable { desc: RelationDesc::empty() .with_column("id", ScalarType::String.nullable(false)) .with_column("oid", ScalarType::Oid.nullable(false)) - .with_column("schema_id", ScalarType::UInt64.nullable(false)) + .with_column("schema_id", ScalarType::String.nullable(false)) .with_column("name", ScalarType::String.nullable(false)) .with_column("type", ScalarType::String.nullable(false)) .with_column("connection_id", ScalarType::String.nullable(true)) @@ -1506,7 +1506,7 @@ pub static MZ_VIEWS: Lazy = Lazy::new(|| BuiltinTable { desc: RelationDesc::empty() .with_column("id", ScalarType::String.nullable(false)) .with_column("oid", ScalarType::Oid.nullable(false)) - .with_column("schema_id", ScalarType::UInt64.nullable(false)) + .with_column("schema_id", ScalarType::String.nullable(false)) .with_column("name", ScalarType::String.nullable(false)) .with_column("definition", ScalarType::String.nullable(false)) .with_column("owner_id", ScalarType::String.nullable(false)), @@ -1518,7 +1518,7 @@ pub static MZ_MATERIALIZED_VIEWS: Lazy = Lazy::new(|| BuiltinTable desc: RelationDesc::empty() .with_column("id", ScalarType::String.nullable(false)) .with_column("oid", ScalarType::Oid.nullable(false)) - .with_column("schema_id", ScalarType::UInt64.nullable(false)) + .with_column("schema_id", ScalarType::String.nullable(false)) .with_column("name", ScalarType::String.nullable(false)) .with_column("cluster_id", ScalarType::String.nullable(false)) .with_column("definition", ScalarType::String.nullable(false)) @@ -1531,7 +1531,7 @@ pub static MZ_TYPES: Lazy = Lazy::new(|| BuiltinTable { desc: RelationDesc::empty() .with_column("id", ScalarType::String.nullable(false)) .with_column("oid", ScalarType::Oid.nullable(false)) - .with_column("schema_id", ScalarType::UInt64.nullable(false)) + .with_column("schema_id", ScalarType::String.nullable(false)) .with_column("name", ScalarType::String.nullable(false)) .with_column("category", ScalarType::String.nullable(false)) .with_column("owner_id", ScalarType::String.nullable(false)), @@ -1602,7 +1602,7 @@ pub static MZ_FUNCTIONS: Lazy = Lazy::new(|| BuiltinTable { desc: RelationDesc::empty() .with_column("id", ScalarType::String.nullable(false)) .with_column("oid", ScalarType::Oid.nullable(false)) - .with_column("schema_id", ScalarType::UInt64.nullable(false)) + .with_column("schema_id", ScalarType::String.nullable(false)) .with_column("name", ScalarType::String.nullable(false)) .with_column( "argument_type_ids", @@ -1655,7 +1655,7 @@ pub static MZ_SECRETS: Lazy = Lazy::new(|| BuiltinTable { schema: MZ_CATALOG_SCHEMA, desc: RelationDesc::empty() .with_column("id", ScalarType::String.nullable(false)) - .with_column("schema_id", ScalarType::UInt64.nullable(false)) + .with_column("schema_id", ScalarType::String.nullable(false)) .with_column("name", ScalarType::String.nullable(false)) .with_column("owner_id", ScalarType::String.nullable(false)), is_retained_metrics_object: false, diff --git a/src/adapter/src/catalog/builtin_table_updates.rs b/src/adapter/src/catalog/builtin_table_updates.rs index 0bdcf3e364e1..c84461f9a2ab 100644 --- a/src/adapter/src/catalog/builtin_table_updates.rs +++ b/src/adapter/src/catalog/builtin_table_updates.rs @@ -91,7 +91,7 @@ impl CatalogState { BuiltinTableUpdate { id: self.resolve_builtin_table(&MZ_DATABASES), row: Row::pack_slice(&[ - Datum::UInt64(database.id.0), + Datum::String(&database.id.to_string()), Datum::UInt32(database.oid), Datum::String(database.name()), Datum::String(&database.owner_id.to_string()), @@ -109,16 +109,16 @@ impl CatalogState { let (database_id, schema) = match database_spec { ResolvedDatabaseSpecifier::Ambient => (None, &self.ambient_schemas_by_id[schema_id]), ResolvedDatabaseSpecifier::Id(id) => ( - Some(id.0), + Some(id.to_string()), &self.database_by_id[id].schemas_by_id[schema_id], ), }; BuiltinTableUpdate { id: self.resolve_builtin_table(&MZ_SCHEMAS), row: Row::pack_slice(&[ - Datum::UInt64(schema_id.0), + Datum::String(&schema_id.to_string()), Datum::UInt32(schema.oid), - Datum::from(database_id), + Datum::from(database_id.as_deref()), Datum::String(&schema.name.schema), Datum::String(&schema.owner_id.to_string()), ]), @@ -397,7 +397,7 @@ impl CatalogState { row: Row::pack_slice(&[ Datum::String(&id.to_string()), Datum::UInt32(oid), - Datum::UInt64(schema_id.into()), + Datum::String(&schema_id.to_string()), Datum::String(name), Datum::String(&owner_id.to_string()), ]), @@ -424,7 +424,7 @@ impl CatalogState { row: Row::pack_slice(&[ Datum::String(&id.to_string()), Datum::UInt32(oid), - Datum::UInt64(schema_id.into()), + Datum::String(&schema_id.to_string()), Datum::String(name), Datum::String(source_desc_name), Datum::from(connection_id.map(|id| id.to_string()).as_deref()), @@ -468,7 +468,7 @@ impl CatalogState { row: Row::pack_slice(&[ Datum::String(&id.to_string()), Datum::UInt32(oid), - Datum::UInt64(schema_id.into()), + Datum::String(&schema_id.to_string()), Datum::String(name), Datum::String(match connection.connection { mz_storage_client::types::connections::Connection::Kafka { .. } => "kafka", @@ -615,7 +615,7 @@ impl CatalogState { row: Row::pack_slice(&[ Datum::String(&id.to_string()), Datum::UInt32(oid), - Datum::UInt64(schema_id.into()), + Datum::String(&schema_id.to_string()), Datum::String(name), Datum::String(&query_string), Datum::String(&owner_id.to_string()), @@ -652,7 +652,7 @@ impl CatalogState { row: Row::pack_slice(&[ Datum::String(&id.to_string()), Datum::UInt32(oid), - Datum::UInt64(schema_id.into()), + Datum::String(&schema_id.to_string()), Datum::String(name), Datum::String(&mview.cluster_id.to_string()), Datum::String(&query_string), @@ -698,7 +698,7 @@ impl CatalogState { row: Row::pack_slice(&[ Datum::String(&id.to_string()), Datum::UInt32(oid), - Datum::UInt64(schema_id.into()), + Datum::String(&schema_id.to_string()), Datum::String(name), Datum::String(connection.name()), Datum::from(sink.connection_id().map(|id| id.to_string()).as_deref()), @@ -800,7 +800,7 @@ impl CatalogState { row: Row::pack_slice(&[ Datum::String(&id.to_string()), Datum::UInt32(oid), - Datum::UInt64(schema_id.into()), + Datum::String(&schema_id.to_string()), Datum::String(name), Datum::String(&TypeCategory::from_catalog_type(&typ.details.typ).to_string()), Datum::String(&owner_id.to_string()), @@ -882,7 +882,7 @@ impl CatalogState { row: Row::pack_slice(&[ Datum::String(&id.to_string()), Datum::UInt32(func_impl_details.oid), - Datum::UInt64(schema_id.into()), + Datum::String(&schema_id.to_string()), Datum::String(name), arg_type_ids, Datum::from( @@ -959,7 +959,7 @@ impl CatalogState { id: self.resolve_builtin_table(&MZ_SECRETS), row: Row::pack_slice(&[ Datum::String(&id.to_string()), - Datum::UInt64(schema_id.into()), + Datum::String(&schema_id.to_string()), Datum::String(name), Datum::String(&owner_id.to_string()), ]), diff --git a/src/adapter/src/catalog/storage.rs b/src/adapter/src/catalog/storage.rs index c181a1e3d6fa..5649713e1b35 100644 --- a/src/adapter/src/catalog/storage.rs +++ b/src/adapter/src/catalog/storage.rs @@ -182,6 +182,7 @@ async fn migrate( txn.databases.insert( DatabaseKey { id: MATERIALIZE_DATABASE_ID, + ns: Some(DatabaseNamespace::User), }, DatabaseValue { name: "materialize".into(), @@ -209,9 +210,11 @@ async fn migrate( txn.schemas.insert( SchemaKey { id: MZ_CATALOG_SCHEMA_ID, + ns: Some(SchemaNamespace::User), }, SchemaValue { database_id: None, + database_ns: None, name: "mz_catalog".into(), owner_id: Some(MZ_SYSTEM_ROLE_ID), }, @@ -219,9 +222,11 @@ async fn migrate( txn.schemas.insert( SchemaKey { id: PG_CATALOG_SCHEMA_ID, + ns: Some(SchemaNamespace::User), }, SchemaValue { database_id: None, + database_ns: None, name: "pg_catalog".into(), owner_id: Some(MZ_SYSTEM_ROLE_ID), }, @@ -229,9 +234,11 @@ async fn migrate( txn.schemas.insert( SchemaKey { id: PUBLIC_SCHEMA_ID, + ns: Some(SchemaNamespace::User), }, SchemaValue { database_id: Some(MATERIALIZE_DATABASE_ID), + database_ns: Some(DatabaseNamespace::User), name: "public".into(), owner_id: Some(MZ_SYSTEM_ROLE_ID), }, @@ -258,9 +265,11 @@ async fn migrate( txn.schemas.insert( SchemaKey { id: MZ_INTERNAL_SCHEMA_ID, + ns: Some(SchemaNamespace::User), }, SchemaValue { database_id: None, + database_ns: None, name: "mz_internal".into(), owner_id: Some(MZ_SYSTEM_ROLE_ID), }, @@ -268,9 +277,11 @@ async fn migrate( txn.schemas.insert( SchemaKey { id: INFORMATION_SCHEMA_ID, + ns: Some(SchemaNamespace::User), }, SchemaValue { database_id: None, + database_ns: None, name: "information_schema".into(), owner_id: Some(MZ_SYSTEM_ROLE_ID), }, @@ -566,6 +577,66 @@ async fn migrate( Ok(()) }, + // Namespacing database ids and schema ids by User or System. Currently + // everything exists in the "User" schema. + // + // Introduced in v0.51.0 + // + // TODO(parkertimmerman): We can make DatabaseKey and SchemaKey more idomatic + // enums after this upgrade. + |txn: &mut Transaction<'_>, _now, _bootstrap_args| { + // Migrate all of our DatabaseKeys. + txn.databases.migrate(|key, value| { + match key.ns { + // Set all keys without a namespace to the User namespace. + None => { + let new_key = DatabaseKey { + id: key.id, + ns: Some(DatabaseNamespace::User), + }; + + Some((new_key, value.clone())) + } + // If a namespace is already set, there is nothing to do. + Some(_) => None, + } + })?; + + // Migrate all of our SchemaKeys. + txn.schemas.migrate(|key, value| { + match key.ns { + // Set all keys without a namespace to the User namespace. + None => { + let new_key = SchemaKey { + id: key.id, + ns: Some(SchemaNamespace::User), + }; + + Some((new_key, value.clone())) + } + // If a namespace is already set, there is nothing to do. + Some(_) => None, + } + })?; + + // Migrate all of our existing items, to set the Schema Namespace + txn.items.update(|_key, value| { + match value.schema_ns { + // Set all schema namespaces to User. + None => { + let mut new_value = value.clone(); + let prev = new_value.schema_ns.replace(SchemaNamespace::User); + assert!(prev.is_none(), "Logic changed, should be None"); + + Some(new_value) + } + // If a schema namespace is already set, there is nothing to do. + Some(_) => None, + } + })?; + + Ok(()) + }, // Add new migrations above. // // Migrations should be preceded with a comment of the following form: @@ -894,7 +965,7 @@ impl Connection { .into_iter() .map(|(k, v)| { ( - DatabaseId::new(k.id), + DatabaseId::from(k), v.name, v.owner_id.expect("owner ID not migrated"), ) @@ -912,9 +983,9 @@ impl Connection { .into_iter() .map(|(k, v)| { ( - SchemaId::new(k.id), + SchemaId::from(k), v.name, - v.database_id.map(DatabaseId::new), + v.database_id.map(DatabaseId::User), v.owner_id.expect("owner ID not migrated"), ) }) @@ -1404,7 +1475,10 @@ impl<'a> Transaction<'a> { let schemas = self.schemas.items(); let mut items = Vec::new(); self.items.for_values(|k, v| { - let schema = match schemas.get(&SchemaKey { id: v.schema_id }) { + let schema = match schemas.get(&SchemaKey { + id: v.schema_id, + ns: v.schema_ns, + }) { Some(schema) => schema, None => panic!( "corrupt stash! unknown schema id {}, for item with key \ @@ -1414,13 +1488,17 @@ impl<'a> Transaction<'a> { }; let database_spec = match schema.database_id { Some(id) => { - if databases.get(&DatabaseKey { id }).is_none() { + let key = DatabaseKey { + id, + ns: schema.database_ns, + }; + if databases.get(&key).is_none() { panic!( "corrupt stash! unknown database id {id}, for item with key \ {k:?} and value {v:?}" ); } - ResolvedDatabaseSpecifier::from(id) + ResolvedDatabaseSpecifier::from(DatabaseId::from(key)) } None => ResolvedDatabaseSpecifier::Ambient, }; @@ -1457,13 +1535,18 @@ impl<'a> Transaction<'a> { ) -> Result { let id = self.get_and_increment_id(DATABASE_ID_ALLOC_KEY.to_string())?; match self.databases.insert( - DatabaseKey { id }, + DatabaseKey { + id, + // TODO(parkertimmerman): Support creating databases in the System namespace. + ns: Some(DatabaseNamespace::User), + }, DatabaseValue { name: database_name.to_string(), owner_id: Some(owner_id), }, ) { - Ok(_) => Ok(DatabaseId::new(id)), + // TODO(parkertimmerman): Support creating databases in the System namespace. + Ok(_) => Ok(DatabaseId::User(id)), Err(_) => Err(Error::new(ErrorKind::DatabaseAlreadyExists( database_name.to_owned(), ))), @@ -1477,15 +1560,22 @@ impl<'a> Transaction<'a> { owner_id: RoleId, ) -> Result { let id = self.get_and_increment_id(SCHEMA_ID_ALLOC_KEY.to_string())?; + let db = DatabaseKey::from(database_id); match self.schemas.insert( - SchemaKey { id }, + SchemaKey { + id, + // TODO(parkertimmerman): Support creating schemas in the System namespace. + ns: Some(SchemaNamespace::User), + }, SchemaValue { - database_id: Some(database_id.0), + database_id: Some(db.id), + database_ns: db.ns, name: schema_name.to_string(), owner_id: Some(owner_id), }, ) { - Ok(_) => Ok(SchemaId::new(id)), + // TODO(parkertimmerman): Support creating schemas in the System namespace. + Ok(_) => Ok(SchemaId::User(id)), Err(_) => Err(Error::new(ErrorKind::SchemaAlreadyExists( schema_name.to_owned(), ))), @@ -1643,10 +1733,12 @@ impl<'a> Transaction<'a> { item: SerializedCatalogItem, owner_id: RoleId, ) -> Result<(), Error> { + let schema_key = SchemaKey::from(schema_id); match self.items.insert( ItemKey { gid: id }, ItemValue { - schema_id: schema_id.0, + schema_id: schema_key.id, + schema_ns: schema_key.ns, name: item_name.to_string(), definition: item, owner_id: Some(owner_id), @@ -1678,7 +1770,7 @@ impl<'a> Transaction<'a> { } pub fn remove_database(&mut self, id: &DatabaseId) -> Result<(), Error> { - let prev = self.databases.set(DatabaseKey { id: id.0 }, None)?; + let prev = self.databases.set(DatabaseKey::from(*id), None)?; if prev.is_some() { Ok(()) } else { @@ -1691,11 +1783,11 @@ impl<'a> Transaction<'a> { database_id: &DatabaseId, schema_id: &SchemaId, ) -> Result<(), Error> { - let prev = self.schemas.set(SchemaKey { id: schema_id.0 }, None)?; + let prev = self.schemas.set(SchemaKey::from(*schema_id), None)?; if prev.is_some() { Ok(()) } else { - Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_id.0, schema_id.0)).into()) + Err(SqlCatalogError::UnknownSchema(format!("{}.{}", database_id, schema_id)).into()) } } @@ -1790,6 +1882,7 @@ impl<'a> Transaction<'a> { if k.gid == id { Some(ItemValue { schema_id: v.schema_id, + schema_ns: v.schema_ns, name: item_name.to_string(), definition: item.clone(), owner_id: v.owner_id, @@ -1821,6 +1914,7 @@ impl<'a> Transaction<'a> { if let Some((item_name, item)) = items.get(&k.gid) { Some(ItemValue { schema_id: v.schema_id, + schema_ns: v.schema_ns, name: item_name.clone(), definition: item.clone(), owner_id: v.owner_id, @@ -1972,7 +2066,7 @@ impl<'a> Transaction<'a> { /// DO NOT call this function in a loop. pub fn update_database(&mut self, id: DatabaseId, database: &Database) -> Result<(), Error> { let n = self.databases.update(|k, _v| { - if k.id == id.0 { + if id == DatabaseId::from(*k) { Some(DatabaseValue { name: database.name().to_string(), owner_id: Some(database.owner_id), @@ -2002,9 +2096,15 @@ impl<'a> Transaction<'a> { schema: &Schema, ) -> Result<(), Error> { let n = self.schemas.update(|k, _v| { - if k.id == schema_id.0 { + let (db_id, db_ns) = match database_id { + None => (None, None), + Some(DatabaseId::System(id)) => (Some(id), Some(DatabaseNamespace::System)), + Some(DatabaseId::User(id)) => (Some(id), Some(DatabaseNamespace::User)), + }; + if schema_id == SchemaId::from(*k) { Some(SchemaValue { - database_id: database_id.map(|id| id.0), + database_id: db_id, + database_ns: db_ns, name: schema.name().schema.clone(), owner_id: Some(schema.owner_id), }) @@ -2439,9 +2539,44 @@ pub struct ClusterIntrospectionSourceIndexValue { index_id: u64, } -#[derive(Clone, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord, Hash)] +#[derive( + Default, Debug, Clone, Copy, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord, Hash, +)] +pub enum DatabaseNamespace { + #[default] + User, + System, +} + +#[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord, Hash)] pub struct DatabaseKey { id: u64, + #[serde(skip_serializing_if = "Option::is_none")] + ns: Option, +} + +impl From for DatabaseId { + fn from(value: DatabaseKey) -> Self { + match value.ns { + None | Some(DatabaseNamespace::User) => DatabaseId::User(value.id), + Some(DatabaseNamespace::System) => DatabaseId::System(value.id), + } + } +} + +impl From for DatabaseKey { + fn from(value: DatabaseId) -> Self { + match value { + DatabaseId::User(id) => DatabaseKey { + id, + ns: Some(DatabaseNamespace::User), + }, + DatabaseId::System(id) => DatabaseKey { + id, + ns: Some(DatabaseNamespace::System), + }, + } + } } #[derive(Clone, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord, Hash)] @@ -2466,14 +2601,51 @@ pub struct DatabaseValue { owner_id: Option, } -#[derive(Clone, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord, Hash)] +#[derive( + Default, Debug, Copy, Clone, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord, Hash, +)] +pub enum SchemaNamespace { + #[default] + User, + System, +} + +#[derive(Clone, Copy, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord, Hash)] pub struct SchemaKey { id: u64, + #[serde(skip_serializing_if = "Option::is_none")] + ns: Option, +} + +impl From for SchemaId { + fn from(value: SchemaKey) -> Self { + match value.ns { + None | Some(SchemaNamespace::User) => SchemaId::User(value.id), + Some(SchemaNamespace::System) => SchemaId::System(value.id), + } + } +} + +impl From for SchemaKey { + fn from(value: SchemaId) -> Self { + match value { + SchemaId::User(id) => SchemaKey { + id, + ns: Some(SchemaNamespace::User), + }, + SchemaId::System(id) => SchemaKey { + id, + ns: Some(SchemaNamespace::System), + }, + } + } } #[derive(Clone, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)] pub struct SchemaValue { database_id: Option, + #[serde(skip_serializing_if = "Option::is_none")] + database_ns: Option, name: String, // TODO(jkosh44) Remove option in v0.50.0 owner_id: Option, @@ -2487,6 +2659,8 @@ pub struct ItemKey { #[derive(Clone, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord, Debug)] pub struct ItemValue { schema_id: u64, + #[serde(skip_serializing_if = "Option::is_none")] + schema_ns: Option, name: String, definition: SerializedCatalogItem, // TODO(jkosh44) Remove option in v0.50.0 @@ -2589,3 +2763,40 @@ pub static ALL_COLLECTIONS: &[&str] = &[ COLLECTION_AUDIT_LOG.name(), COLLECTION_STORAGE_USAGE.name(), ]; + +#[cfg(test)] +mod test { + use super::{DatabaseKey, DatabaseNamespace}; + + #[test] + fn test_database_key_roundtrips() { + let k_none = DatabaseKey { id: 42, ns: None }; + let k_user = DatabaseKey { + id: 24, + ns: Some(DatabaseNamespace::User), + }; + let k_sys = DatabaseKey { + id: 0, + ns: Some(DatabaseNamespace::System), + }; + + for key in [k_none, k_user, k_sys] { + let og_json = serde_json::to_string(&key).expect("valid key"); + + // Make sure our type roundtrips. + let after: DatabaseKey = serde_json::from_str(&og_json).expect("valid json"); + assert_eq!(after, key); + + // Our JSON should roundtrip too. + let af_json = serde_json::to_string(&after).expect("valid key"); + assert_eq!(og_json, af_json); + } + + let json_none = serde_json::json!({ "id": 42 }).to_string(); + + let key: DatabaseKey = serde_json::from_str(&json_none).expect("valid json"); + let json_after = serde_json::to_string(&key).expect("valid key"); + + assert_eq!(json_none, json_after); + } +} diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index 1ad350912695..b7284a913c87 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -275,9 +275,9 @@ impl AstDisplay for ResolvedDatabaseSpecifier { } } -impl From for ResolvedDatabaseSpecifier { - fn from(id: u64) -> Self { - Self::Id(DatabaseId(id)) +impl From for ResolvedDatabaseSpecifier { + fn from(id: DatabaseId) -> Self { + Self::Id(id) } } @@ -320,7 +320,7 @@ impl From for SchemaSpecifier { if id == Self::TEMPORARY_SCHEMA_ID { Self::Temporary } else { - Self::Id(SchemaId(id)) + Self::Id(SchemaId::User(id)) } } } @@ -328,7 +328,7 @@ impl From for SchemaSpecifier { impl From<&SchemaSpecifier> for SchemaId { fn from(schema_spec: &SchemaSpecifier) -> Self { match schema_spec { - SchemaSpecifier::Temporary => SchemaId(SchemaSpecifier::TEMPORARY_SCHEMA_ID), + SchemaSpecifier::Temporary => SchemaId::User(SchemaSpecifier::TEMPORARY_SCHEMA_ID), SchemaSpecifier::Id(id) => id.clone(), } } @@ -337,24 +337,12 @@ impl From<&SchemaSpecifier> for SchemaId { impl From for SchemaId { fn from(schema_spec: SchemaSpecifier) -> Self { match schema_spec { - SchemaSpecifier::Temporary => SchemaId(SchemaSpecifier::TEMPORARY_SCHEMA_ID), + SchemaSpecifier::Temporary => SchemaId::User(SchemaSpecifier::TEMPORARY_SCHEMA_ID), SchemaSpecifier::Id(id) => id, } } } -impl From<&SchemaSpecifier> for u64 { - fn from(schema_spec: &SchemaSpecifier) -> Self { - SchemaId::from(schema_spec).0 - } -} - -impl From for u64 { - fn from(schema_spec: SchemaSpecifier) -> Self { - SchemaId::from(schema_spec).0 - } -} - // Aug is the type variable assigned to an AST that has already been // name-resolved. An AST in this state has global IDs populated next to table // names, and local IDs assigned to CTE definitions and references. @@ -632,19 +620,42 @@ impl AstInfo for Aug { /// The identifier for a schema. #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] -pub struct SchemaId(pub u64); +#[serde(into = "SchemaIdJson")] +#[serde(try_from = "SchemaIdJson")] +pub enum SchemaId { + User(u64), + System(u64), +} + +/// A type that specifies how we serialize and deserialize a [`SchemaId`]. +/// +/// Note: JSON maps require their keys to be strings, and we use [`SchemaId`] as the key +/// in some maps, so we need to serialize them as plain strings, which is what this type +/// does. +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(transparent)] +struct SchemaIdJson(String); + +impl From for SchemaIdJson { + fn from(id: SchemaId) -> SchemaIdJson { + SchemaIdJson(id.to_string()) + } +} + +impl TryFrom for SchemaId { + type Error = PlanError; -impl SchemaId { - /// Constructs a new schema identifier. It is the caller's responsibility - /// to provide a unique `id`. - pub fn new(id: u64) -> Self { - SchemaId(id) + fn try_from(value: SchemaIdJson) -> Result { + SchemaId::from_str(&value.0) } } impl fmt::Display for SchemaId { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) + match self { + SchemaId::System(id) => write!(f, "s{}", id), + SchemaId::User(id) => write!(f, "u{}", id), + } } } @@ -652,26 +663,62 @@ impl FromStr for SchemaId { type Err = PlanError; fn from_str(s: &str) -> Result { - let val: u64 = s.parse()?; - Ok(SchemaId(val)) + if s.len() < 2 { + return Err(PlanError::Unstructured(format!( + "couldn't parse SchemaId {}", + s + ))); + } + let val: u64 = s[1..].parse()?; + match s.chars().next() { + Some('s') => Ok(SchemaId::System(val)), + Some('u') => Ok(SchemaId::User(val)), + _ => Err(PlanError::Unstructured(format!( + "couldn't parse SchemaId {}", + s + ))), + } } } /// The identifier for a database. #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] -pub struct DatabaseId(pub u64); +#[serde(into = "DatabaseIdJson")] +#[serde(try_from = "DatabaseIdJson")] +pub enum DatabaseId { + User(u64), + System(u64), +} + +/// A type that specifies how we serialize and deserialize a [`DatabaseId`]. +/// +/// Note: JSON maps require their keys to be strings, and we use [`DatabaseId`] as the key +/// in some maps, so we need to serialize them as plain strings, which is what this type +/// does. +#[derive(Clone, Debug, Serialize, Deserialize)] +#[serde(transparent)] +struct DatabaseIdJson(String); -impl DatabaseId { - /// Constructs a new database identifier. It is the caller's responsibility - /// to provide a unique `id`. - pub fn new(id: u64) -> Self { - DatabaseId(id) +impl From for DatabaseIdJson { + fn from(id: DatabaseId) -> DatabaseIdJson { + DatabaseIdJson(id.to_string()) + } +} + +impl TryFrom for DatabaseId { + type Error = PlanError; + + fn try_from(value: DatabaseIdJson) -> Result { + DatabaseId::from_str(&value.0) } } impl fmt::Display for DatabaseId { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "{}", self.0) + match self { + DatabaseId::System(id) => write!(f, "s{}", id), + DatabaseId::User(id) => write!(f, "u{}", id), + } } } @@ -679,8 +726,21 @@ impl FromStr for DatabaseId { type Err = PlanError; fn from_str(s: &str) -> Result { - let val: u64 = s.parse()?; - Ok(DatabaseId(val)) + if s.len() < 2 { + return Err(PlanError::Unstructured(format!( + "couldn't parse DatabaseId {}", + s + ))); + } + let val: u64 = s[1..].parse()?; + match s.chars().next() { + Some('s') => Ok(DatabaseId::System(val)), + Some('u') => Ok(DatabaseId::User(val)), + _ => Err(PlanError::Unstructured(format!( + "couldn't parse DatabaseId {}", + s + ))), + } } } diff --git a/src/sql/src/plan/statement/show.rs b/src/sql/src/plan/statement/show.rs index f090f092aa27..24ee030639b6 100644 --- a/src/sql/src/plan/statement/show.rs +++ b/src/sql/src/plan/statement/show.rs @@ -280,9 +280,9 @@ pub fn show_schemas<'a>( filter: Option>, ) -> Result, PlanError> { let database_id = match from { - Some(ResolvedDatabaseName::Database { id, .. }) => id.0, + Some(ResolvedDatabaseName::Database { id, .. }) => id.to_string(), None => match scx.active_database() { - Some(id) => id.0, + Some(id) => id.to_string(), None => sql_bail!("no database specified and no active database"), }, Some(ResolvedDatabaseName::Error) => { @@ -292,7 +292,7 @@ pub fn show_schemas<'a>( let query = format!( "SELECT name FROM mz_catalog.mz_schemas - WHERE database_id IS NULL OR database_id = {database_id}", + WHERE database_id IS NULL OR database_id = '{database_id}'", ); ShowSelect::new(scx, query, filter, None, None) } @@ -350,7 +350,7 @@ fn show_connections<'a>( let query = format!( "SELECT name, type FROM mz_catalog.mz_connections - WHERE schema_id = {schema_spec}", + WHERE schema_id = '{schema_spec}'", ); ShowSelect::new(scx, query, filter, None, None) } @@ -364,7 +364,7 @@ fn show_tables<'a>( let query = format!( "SELECT name FROM mz_catalog.mz_tables - WHERE schema_id = {schema_spec}", + WHERE schema_id = '{schema_spec}'", ); ShowSelect::new(scx, query, filter, None, None) } @@ -378,7 +378,7 @@ fn show_sources<'a>( let query = format!( "SELECT name, type, size FROM mz_catalog.mz_sources - WHERE schema_id = {schema_spec}" + WHERE schema_id = '{schema_spec}'" ); ShowSelect::new(scx, query, filter, None, None) } @@ -392,7 +392,7 @@ fn show_views<'a>( let query = format!( "SELECT name FROM mz_catalog.mz_views - WHERE schema_id = {schema_spec}" + WHERE schema_id = '{schema_spec}'" ); ShowSelect::new(scx, query, filter, None, None) } @@ -404,7 +404,7 @@ fn show_materialized_views<'a>( filter: Option>, ) -> Result, PlanError> { let schema_spec = scx.resolve_optional_schema(&from)?; - let mut where_clause = format!("schema_id = {schema_spec}"); + let mut where_clause = format!("schema_id = '{schema_spec}'"); if let Some(cluster) = in_cluster { write!(where_clause, " AND cluster_id = '{}'", cluster.id) @@ -433,7 +433,7 @@ fn show_sinks<'a>( let query = format!( "SELECT sinks.name, sinks.type, sinks.size FROM mz_catalog.mz_sinks AS sinks - WHERE schema_id = {schema_spec}", + WHERE schema_id = '{schema_spec}'", ); ShowSelect::new(scx, query, filter, None, None) } @@ -447,7 +447,7 @@ fn show_types<'a>( let query = format!( "SELECT name FROM mz_catalog.mz_types - WHERE schema_id = {schema_spec}", + WHERE schema_id = '{schema_spec}'", ); ShowSelect::new(scx, query, filter, None, None) } @@ -461,7 +461,7 @@ fn show_all_objects<'a>( let query = format!( "SELECT name, type FROM mz_catalog.mz_objects - WHERE schema_id = {schema_spec}", + WHERE schema_id = '{schema_spec}'", ); ShowSelect::new(scx, query, filter, None, None) } @@ -478,7 +478,7 @@ pub fn show_indexes<'a>( if on_object.is_none() && from_schema.is_none() && in_cluster.is_none() { query_filter.push("on_id NOT LIKE 's%'".into()); let schema_spec = scx.resolve_active_schema().map(|spec| spec.clone())?; - query_filter.push(format!("schema_id = {}", schema_spec)); + query_filter.push(format!("schema_id = '{schema_spec}'")); } if let Some(on_object) = &on_object { @@ -499,7 +499,7 @@ pub fn show_indexes<'a>( if let Some(schema) = from_schema { let schema_spec = schema.schema_spec(); - query_filter.push(format!("schema_id = {}", schema_spec)); + query_filter.push(format!("schema_id = '{schema_spec}'")); } if let Some(cluster) = in_cluster { @@ -586,7 +586,7 @@ pub fn show_secrets<'a>( let query = format!( "SELECT name FROM mz_catalog.mz_secrets - WHERE schema_id = {schema_spec}", + WHERE schema_id = '{schema_spec}'", ); ShowSelect::new(scx, query, filter, None, None) diff --git a/src/stash/src/lib.rs b/src/stash/src/lib.rs index 0c844a1fa66d..aef0cefcb340 100644 --- a/src/stash/src/lib.rs +++ b/src/stash/src/lib.rs @@ -945,6 +945,33 @@ where } } + pub fn migrate Option<(K, V)>>(&mut self, f: F) -> Result { + let mut changed = 0; + // Keep a copy of pending in case of uniqueness violation. + let pending = self.pending.clone(); + self.for_values_mut(|p, k, v| { + if let Some((new_k, new_v)) = f(k, v) { + changed += 1; + + // If the key has changed, delete the old entry. + if *k != new_k { + p.insert(k.clone(), None); + } + + // Insert the new key and value. + p.insert(new_k, Some(new_v)); + } + }); + // Check for uniqueness violation. + if let Err(err) = self.verify() { + // Reset our pending set if we have a violation. + self.pending = pending; + Err(err) + } else { + Ok(changed) + } + } + /// Set the value for a key. Returns the previous entry if the key existed, /// otherwise None. /// diff --git a/test/sqllogictest/audit_log.slt b/test/sqllogictest/audit_log.slt index 15ad446a79b8..cda59d8c3662 100644 --- a/test/sqllogictest/audit_log.slt +++ b/test/sqllogictest/audit_log.slt @@ -89,14 +89,14 @@ SELECT id, event_type, object_type, details, user FROM mz_audit_events ORDER BY 4 create cluster-replica {"cluster_id":"u1","cluster_name":"default","logical_size":"1","replica_id":"1","replica_name":"r1"} NULL 5 create role {"id":"u1","name":"default_owner"} NULL 6 create role {"id":"u2","name":"materialize"} materialize -7 create database {"id":"2","name":"test"} materialize -8 create schema {"database_name":"test","id":"6","name":"public"} materialize -9 create schema {"database_name":"test","id":"7","name":"sc1"} materialize -10 create schema {"database_name":"test","id":"8","name":"sc2"} materialize -11 drop schema {"database_name":"test","id":"7","name":"sc1"} materialize -12 drop schema {"database_name":"test","id":"6","name":"public"} materialize -13 drop schema {"database_name":"test","id":"8","name":"sc2"} materialize -14 drop database {"id":"2","name":"test"} materialize +7 create database {"id":"u2","name":"test"} materialize +8 create schema {"database_name":"test","id":"u6","name":"public"} materialize +9 create schema {"database_name":"test","id":"u7","name":"sc1"} materialize +10 create schema {"database_name":"test","id":"u8","name":"sc2"} materialize +11 drop schema {"database_name":"test","id":"u7","name":"sc1"} materialize +12 drop schema {"database_name":"test","id":"u6","name":"public"} materialize +13 drop schema {"database_name":"test","id":"u8","name":"sc2"} materialize +14 drop database {"id":"u2","name":"test"} materialize 15 create role {"id":"u3","name":"foo"} materialize 16 drop role {"id":"u3","name":"foo"} materialize 17 create cluster {"id":"u2","name":"foo"} materialize diff --git a/test/sqllogictest/id_reuse.slt b/test/sqllogictest/id_reuse.slt index fbcb1d309c9d..ea92bdf27620 100644 --- a/test/sqllogictest/id_reuse.slt +++ b/test/sqllogictest/id_reuse.slt @@ -17,15 +17,15 @@ reset-server statement ok CREATE SCHEMA foo -query IT rowsort +query TT rowsort SELECT id, name FROM mz_schemas ---- -1 mz_catalog -2 pg_catalog -3 public -4 mz_internal -5 information_schema -6 foo +u1 mz_catalog +u2 pg_catalog +u3 public +u4 mz_internal +u5 information_schema +u6 foo statement ok DROP schema foo @@ -33,24 +33,24 @@ DROP schema foo statement ok CREATE schema bar -query IT rowsort +query TT rowsort SELECT id, name FROM mz_schemas ---- -1 mz_catalog -2 pg_catalog -3 public -4 mz_internal -5 information_schema -7 bar +u1 mz_catalog +u2 pg_catalog +u3 public +u4 mz_internal +u5 information_schema +u7 bar statement ok CREATE DATABASE foo -query IT rowsort +query TT rowsort SELECT id, name FROM mz_databases ---- -1 materialize -2 foo +u1 materialize +u2 foo statement ok DROP DATABASE foo @@ -58,11 +58,11 @@ DROP DATABASE foo statement ok CREATE DATABASE bar -query IT rowsort +query TT rowsort SELECT id, name FROM mz_databases ---- -1 materialize -3 bar +u1 materialize +u3 bar statement ok CREATE ROLE foo diff --git a/test/sqllogictest/secret.slt b/test/sqllogictest/secret.slt index 9af7670e5cfa..b072fd40d101 100644 --- a/test/sqllogictest/secret.slt +++ b/test/sqllogictest/secret.slt @@ -30,11 +30,11 @@ CREATE SECRET secret AS decode('c2VjcmV0Cg==', 'base64'); statement OK CREATE SECRET key AS decode('c2VjcmV0Cg==', 'base64'); -query TIT rowsort +query TTT rowsort SELECT id, schema_id, name FROM mz_secrets ---- -u1 3 secret -u4 3 key +u1 u3 secret +u4 u3 key query T rowsort SHOW SECRETS diff --git a/test/sqllogictest/source_sizing.slt b/test/sqllogictest/source_sizing.slt index 1bd627072881..b493716caf2a 100644 --- a/test/sqllogictest/source_sizing.slt +++ b/test/sqllogictest/source_sizing.slt @@ -21,10 +21,10 @@ CREATE SOURCE s2 FROM LOAD GENERATOR COUNTER WITH (SIZE '1'); query TTTTTT SELECT id, schema_id, name, type, connection_id, size FROM mz_sources WHERE id LIKE 'u%' ---- -u2 3 s1 load-generator NULL 1 -u4 3 s2 load-generator NULL 1 -u1 3 s1_progress subsource NULL NULL -u3 3 s2_progress subsource NULL NULL +u2 u3 s1 load-generator NULL 1 +u4 u3 s2 load-generator NULL 1 +u1 u3 s1_progress subsource NULL NULL +u3 u3 s2_progress subsource NULL NULL query TTT SHOW SOURCES diff --git a/test/sqllogictest/system-cluster.slt b/test/sqllogictest/system-cluster.slt index 364417e32cb4..09a9cde11667 100644 --- a/test/sqllogictest/system-cluster.slt +++ b/test/sqllogictest/system-cluster.slt @@ -32,7 +32,7 @@ EXPLAIN SHOW SCHEMAS ---- Explained Query (fast path): Project (#3) - Filter ((#0) IS NULL OR (1 = bigint_to_numeric(#0))) + Filter ((#0) IS NULL OR (#0 = "u1")) ReadExistingIndex mz_internal.mz_show_schemas_ind Used Indexes: @@ -45,8 +45,7 @@ EXPLAIN SHOW CONNECTIONS ---- Explained Query (fast path): Project (#3, #4) - Filter (3 = bigint_to_numeric(#0)) - ReadExistingIndex mz_internal.mz_show_connections_ind + ReadExistingIndex mz_internal.mz_show_connections_ind lookup_value=("u3") Used Indexes: - mz_internal.mz_show_connections_ind @@ -58,8 +57,7 @@ EXPLAIN SHOW TABLES ---- Explained Query (fast path): Project (#3) - Filter (3 = bigint_to_numeric(#0)) - ReadExistingIndex mz_internal.mz_show_tables_ind + ReadExistingIndex mz_internal.mz_show_tables_ind lookup_value=("u3") Used Indexes: - mz_internal.mz_show_tables_ind @@ -71,8 +69,7 @@ EXPLAIN SHOW SOURCES ---- Explained Query (fast path): Project (#3, #4, #6) - Filter (3 = bigint_to_numeric(#0)) - ReadExistingIndex mz_internal.mz_show_sources_ind + ReadExistingIndex mz_internal.mz_show_sources_ind lookup_value=("u3") Used Indexes: - mz_internal.mz_show_sources_ind @@ -84,8 +81,7 @@ EXPLAIN SHOW VIEWS ---- Explained Query (fast path): Project (#3) - Filter (3 = bigint_to_numeric(#0)) - ReadExistingIndex mz_internal.mz_show_views_ind + ReadExistingIndex mz_internal.mz_show_views_ind lookup_value=("u3") Used Indexes: - mz_internal.mz_show_views_ind @@ -97,7 +93,7 @@ EXPLAIN SHOW MATERIALIZED VIEWS ---- Explained Query (fast path): Project (#2, #3) - Filter (3 = bigint_to_numeric(#0)) + Filter (#0 = "u3") ReadExistingIndex mz_internal.mz_show_materialized_views_ind Used Indexes: @@ -110,7 +106,7 @@ EXPLAIN SHOW INDEXES ---- Explained Query (fast path): Project (#3..=#6) - Filter NOT("s%" ~~(#0)) AND (3 = bigint_to_numeric(#1)) + Filter NOT("s%" ~~(#0)) AND (#1 = "u3") ReadExistingIndex mz_internal.mz_show_indexes_ind Used Indexes: @@ -123,8 +119,7 @@ EXPLAIN SHOW SINKS ---- Explained Query (fast path): Project (#3, #4, #6) - Filter (3 = bigint_to_numeric(#0)) - ReadExistingIndex mz_internal.mz_show_sinks_ind + ReadExistingIndex mz_internal.mz_show_sinks_ind lookup_value=("u3") Used Indexes: - mz_internal.mz_show_sinks_ind @@ -136,8 +131,7 @@ EXPLAIN SHOW TYPES ---- Explained Query (fast path): Project (#3) - Filter (3 = bigint_to_numeric(#0)) - ReadExistingIndex mz_internal.mz_show_types_ind + ReadExistingIndex mz_internal.mz_show_types_ind lookup_value=("u3") Used Indexes: - mz_internal.mz_show_types_ind @@ -149,8 +143,7 @@ EXPLAIN SHOW OBJECTS ---- Explained Query (fast path): Project (#3, #4) - Filter (3 = bigint_to_numeric(#0)) - ReadExistingIndex mz_internal.mz_show_all_objects_ind + ReadExistingIndex mz_internal.mz_show_all_objects_ind lookup_value=("u3") Used Indexes: - mz_internal.mz_show_all_objects_ind @@ -200,8 +193,7 @@ EXPLAIN SHOW SECRETS ---- Explained Query (fast path): Project (#2) - Filter (3 = bigint_to_numeric(#0)) - ReadExistingIndex mz_internal.mz_show_secrets_ind + ReadExistingIndex mz_internal.mz_show_secrets_ind lookup_value=("u3") Used Indexes: - mz_internal.mz_show_secrets_ind diff --git a/test/testdrive/tables.td b/test/testdrive/tables.td index e90b0e1a9c60..587fda812735 100644 --- a/test/testdrive/tables.td +++ b/test/testdrive/tables.td @@ -347,7 +347,7 @@ contains:column "a" specified more than once # Test pg_table_is_visible. > CREATE SCHEMA non_default > CREATE TABLE non_default.hidden (dummy int) -> SELECT name, pg_table_is_visible(oid) AS visible FROM mz_tables WHERE schema_id != 1 AND id LIKE 'u%' +> SELECT name, pg_table_is_visible(oid) AS visible FROM mz_tables WHERE schema_id != 'u1' AND id LIKE 'u%' name visible --------------- hidden false diff --git a/test/upgrade/create-in-v0.27.0-view-on-builtin.td b/test/upgrade/create-in-v0.27.0-view-on-builtin.td index 0e5e8d792792..af8fe93cfae6 100644 --- a/test/upgrade/create-in-v0.27.0-view-on-builtin.td +++ b/test/upgrade/create-in-v0.27.0-view-on-builtin.td @@ -7,4 +7,4 @@ # the Business Source License, use of this software will be governed # by the Apache License, Version 2.0. -> CREATE MATERIALIZED VIEW built_in_view (a) AS SELECT AVG(schema_id) FROM mz_catalog.mz_objects; +> CREATE MATERIALIZED VIEW built_in_view (a) AS SELECT AVG(position) FROM mz_catalog.mz_columns; From 6f81b5bfa7657a384bd0a87913a29b431f3acbbd Mon Sep 17 00:00:00 2001 From: Parker Timmerman Date: Thu, 13 Apr 2023 11:01:16 -0400 Subject: [PATCH 2/5] Fix upgrades --- src/adapter/src/catalog/storage.rs | 44 ++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/src/adapter/src/catalog/storage.rs b/src/adapter/src/catalog/storage.rs index 5649713e1b35..a18d161c3cfa 100644 --- a/src/adapter/src/catalog/storage.rs +++ b/src/adapter/src/catalog/storage.rs @@ -182,7 +182,7 @@ async fn migrate( txn.databases.insert( DatabaseKey { id: MATERIALIZE_DATABASE_ID, - ns: Some(DatabaseNamespace::User), + ns: None, }, DatabaseValue { name: "materialize".into(), @@ -210,7 +210,7 @@ async fn migrate( txn.schemas.insert( SchemaKey { id: MZ_CATALOG_SCHEMA_ID, - ns: Some(SchemaNamespace::User), + ns: None, }, SchemaValue { database_id: None, @@ -222,7 +222,7 @@ async fn migrate( txn.schemas.insert( SchemaKey { id: PG_CATALOG_SCHEMA_ID, - ns: Some(SchemaNamespace::User), + ns: None, }, SchemaValue { database_id: None, @@ -234,11 +234,11 @@ async fn migrate( txn.schemas.insert( SchemaKey { id: PUBLIC_SCHEMA_ID, - ns: Some(SchemaNamespace::User), + ns: None, }, SchemaValue { database_id: Some(MATERIALIZE_DATABASE_ID), - database_ns: Some(DatabaseNamespace::User), + database_ns: None, name: "public".into(), owner_id: Some(MZ_SYSTEM_ROLE_ID), }, @@ -265,7 +265,7 @@ async fn migrate( txn.schemas.insert( SchemaKey { id: MZ_INTERNAL_SCHEMA_ID, - ns: Some(SchemaNamespace::User), + ns: None, }, SchemaValue { database_id: None, @@ -277,7 +277,7 @@ async fn migrate( txn.schemas.insert( SchemaKey { id: INFORMATION_SCHEMA_ID, - ns: Some(SchemaNamespace::User), + ns: None, }, SchemaValue { database_id: None, @@ -602,20 +602,40 @@ async fn migrate( } })?; - // Migrate all of our SchemaKeys. + // Migrate all of our SchemaKeys and SchemaValues txn.schemas.migrate(|key, value| { - match key.ns { + let new_key = match key.ns { // Set all keys without a namespace to the User namespace. None => { let new_key = SchemaKey { id: key.id, ns: Some(SchemaNamespace::User), }; - - Some((new_key, value.clone())) + Some(new_key) } // If a namespace is already set, there is nothing to do. Some(_) => None, + }; + let new_value = match value.database_ns { + // Set all values without a database namespace to the User namespace. + None => { + let new_value = SchemaValue { + database_id: value.database_id, + database_ns: Some(DatabaseNamespace::User), + name: value.name.clone(), + owner_id: value.owner_id, + }; + Some(new_value) + } + // If a namespace is already set, there is nothing to do. + Some(_) => None, + }; + + match (new_key, new_value) { + (Some(n_k), None) => Some((n_k, value.clone())), + (None, Some(n_v)) => Some((key.clone(), n_v)), + (Some(n_k), Some(n_v)) => Some((n_k, n_v)), + (None, None) => None, } })?; @@ -1494,7 +1514,7 @@ impl<'a> Transaction<'a> { }; if databases.get(&key).is_none() { panic!( - "corrupt stash! unknown database id {id}, for item with key \ + "corrupt stash! unknown database id {key:?}, for item with key \ {k:?} and value {v:?}" ); } From f6c33aeabbc23e156e44a8df50ff311987068663 Mon Sep 17 00:00:00 2001 From: Parker Timmerman Date: Fri, 14 Apr 2023 07:42:00 -0400 Subject: [PATCH 3/5] review feedback, rename methods, remove footguns, add comments --- src/adapter/src/catalog.rs | 6 +++--- src/adapter/src/catalog/storage.rs | 20 +++++++++++++------- src/sql/src/names.rs | 13 +++++++------ src/stash/src/lib.rs | 4 ++++ 4 files changed, 27 insertions(+), 16 deletions(-) diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index 8b34ac437c8c..b53f85869e51 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -4774,8 +4774,8 @@ impl Catalog { public_schema_oid, owner_id, } => { - let database_id = tx.insert_database(&name, owner_id)?; - let schema_id = tx.insert_schema(database_id, DEFAULT_SCHEMA, owner_id)?; + let database_id = tx.insert_user_database(&name, owner_id)?; + let schema_id = tx.insert_user_schema(database_id, DEFAULT_SCHEMA, owner_id)?; state.add_to_audit_log( oracle_write_ts, session, @@ -4850,7 +4850,7 @@ impl Catalog { ))); } }; - let schema_id = tx.insert_schema(database_id, &schema_name, owner_id)?; + let schema_id = tx.insert_user_schema(database_id, &schema_name, owner_id)?; state.add_to_audit_log( oracle_write_ts, session, diff --git a/src/adapter/src/catalog/storage.rs b/src/adapter/src/catalog/storage.rs index a18d161c3cfa..8d3aa39eda04 100644 --- a/src/adapter/src/catalog/storage.rs +++ b/src/adapter/src/catalog/storage.rs @@ -582,8 +582,8 @@ async fn migrate( // // Introduced in v0.51.0 // - // TODO(parkertimmerman): We can make DatabaseKey and SchemaKey more idomatic - // enums after this upgrade. + // TODO(parkertimmerman): Once we support more complex migrations, we + // should make DatabaseKey and SchemaKey more idomatic enums. |txn: &mut Transaction<'_>, _now, _bootstrap_args| { // Migrate all of our DatabaseKeys. txn.databases.migrate(|key, value| { @@ -1495,10 +1495,11 @@ impl<'a> Transaction<'a> { let schemas = self.schemas.items(); let mut items = Vec::new(); self.items.for_values(|k, v| { - let schema = match schemas.get(&SchemaKey { + let schema_key = SchemaKey { id: v.schema_id, ns: v.schema_ns, - }) { + }; + let schema = match schemas.get(&schema_key) { Some(schema) => schema, None => panic!( "corrupt stash! unknown schema id {}, for item with key \ @@ -1522,12 +1523,13 @@ impl<'a> Transaction<'a> { } None => ResolvedDatabaseSpecifier::Ambient, }; + let schema_id = SchemaId::from(schema_key); items.push(( k.gid, QualifiedItemName { qualifiers: ItemQualifiers { database_spec, - schema_spec: SchemaSpecifier::from(v.schema_id), + schema_spec: SchemaSpecifier::from(schema_id), }, item: v.name.clone(), }, @@ -1548,7 +1550,7 @@ impl<'a> Transaction<'a> { .push((StorageUsageKey { metric }, (), 1)); } - pub fn insert_database( + pub fn insert_user_database( &mut self, database_name: &str, owner_id: RoleId, @@ -1573,7 +1575,7 @@ impl<'a> Transaction<'a> { } } - pub fn insert_schema( + pub fn insert_user_schema( &mut self, database_id: DatabaseId, schema_name: &str, @@ -2571,6 +2573,7 @@ pub enum DatabaseNamespace { #[derive(Clone, Copy, Debug, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord, Hash)] pub struct DatabaseKey { id: u64, + // TODO(parkertimmerman) Remove option in v0.53.0 #[serde(skip_serializing_if = "Option::is_none")] ns: Option, } @@ -2633,6 +2636,7 @@ pub enum SchemaNamespace { #[derive(Clone, Copy, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord, Hash)] pub struct SchemaKey { id: u64, + // TODO(parkertimmerman) Remove option in v0.53.0 #[serde(skip_serializing_if = "Option::is_none")] ns: Option, } @@ -2664,6 +2668,7 @@ impl From for SchemaKey { #[derive(Clone, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord)] pub struct SchemaValue { database_id: Option, + // TODO(parkertimmerman) Remove option in v0.53.0 #[serde(skip_serializing_if = "Option::is_none")] database_ns: Option, name: String, @@ -2679,6 +2684,7 @@ pub struct ItemKey { #[derive(Clone, Deserialize, Serialize, PartialOrd, PartialEq, Eq, Ord, Debug)] pub struct ItemValue { schema_id: u64, + // TODO(parkertimmerman) Remove option in v0.53.0 #[serde(skip_serializing_if = "Option::is_none")] schema_ns: Option, name: String, diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index b7284a913c87..c81c13f73935 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -315,12 +315,13 @@ impl AstDisplay for SchemaSpecifier { } } -impl From for SchemaSpecifier { - fn from(id: u64) -> Self { - if id == Self::TEMPORARY_SCHEMA_ID { - Self::Temporary - } else { - Self::Id(SchemaId::User(id)) +impl From for SchemaSpecifier { + fn from(id: SchemaId) -> SchemaSpecifier { + match id { + SchemaId::User(id) if id == SchemaSpecifier::TEMPORARY_SCHEMA_ID => { + SchemaSpecifier::Temporary + } + schema_id => SchemaSpecifier::Id(schema_id), } } } diff --git a/src/stash/src/lib.rs b/src/stash/src/lib.rs index aef0cefcb340..6bf4ce9e158f 100644 --- a/src/stash/src/lib.rs +++ b/src/stash/src/lib.rs @@ -945,6 +945,10 @@ where } } + /// Migrates k, v pairs. `f` is a function that can return `Some((K, V))` if the value should + /// be updated, otherwise `None`. Returns the number of changed entries. + /// + /// Returns an error if the uniqueness check failed. pub fn migrate Option<(K, V)>>(&mut self, f: F) -> Result { let mut changed = 0; // Keep a copy of pending in case of uniqueness violation. From 9b89be9a3eaecfc9a3e74c5e8d3f486a0fa1cd65 Mon Sep 17 00:00:00 2001 From: Parker Timmerman Date: Fri, 14 Apr 2023 13:02:02 -0500 Subject: [PATCH 4/5] get rid of the KeyJson types --- src/adapter/src/catalog.rs | 29 +++++++++++++++++++++- src/sql/src/names.rs | 50 -------------------------------------- 2 files changed, 28 insertions(+), 51 deletions(-) diff --git a/src/adapter/src/catalog.rs b/src/adapter/src/catalog.rs index b53f85869e51..f982ea50e3c4 100644 --- a/src/adapter/src/catalog.rs +++ b/src/adapter/src/catalog.rs @@ -1353,7 +1353,14 @@ impl CatalogState { /// that the serialized state for two identical catalogs will compare /// identically. pub fn dump(&self) -> String { - serde_json::to_string(&self.database_by_id).expect("serialization cannot fail") + // Note: database_by_id is a Map whose keys are not Strings, but serializing + // a Map to JSON requires the keys be strings, hence the mapping here. + let database_by_str: BTreeMap = self + .database_by_id + .iter() + .map(|(key, value)| (key.to_string(), value.debug_json())) + .collect(); + serde_json::to_string(&database_by_str).expect("serialization cannot fail") } pub fn availability_zones(&self) -> &[String] { @@ -1517,6 +1524,26 @@ pub struct Database { pub owner_id: RoleId, } +impl Database { + /// Returns a `Database` formatted as a `serde_json::Value` that is suitable for debugging. For + /// example `CatalogState::dump`. + fn debug_json(&self) -> serde_json::Value { + let schemas_by_str: BTreeMap = self + .schemas_by_id + .iter() + .map(|(key, value)| (key.to_string(), value)) + .collect(); + + serde_json::json!({ + "name": self.name, + "id": self.id, + "schemas_by_id": schemas_by_str, + "schemas_by_name": self.schemas_by_name, + "owner_id": self.owner_id, + }) + } +} + #[derive(Debug, Deserialize, Serialize, Clone)] pub struct Schema { pub name: QualifiedSchemaName, diff --git a/src/sql/src/names.rs b/src/sql/src/names.rs index c81c13f73935..98e347461626 100644 --- a/src/sql/src/names.rs +++ b/src/sql/src/names.rs @@ -621,36 +621,11 @@ impl AstInfo for Aug { /// The identifier for a schema. #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] -#[serde(into = "SchemaIdJson")] -#[serde(try_from = "SchemaIdJson")] pub enum SchemaId { User(u64), System(u64), } -/// A type that specifies how we serialize and deserialize a [`SchemaId`]. -/// -/// Note: JSON maps require their keys to be strings, and we use [`SchemaId`] as the key -/// in some maps, so we need to serialize them as plain strings, which is what this type -/// does. -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(transparent)] -struct SchemaIdJson(String); - -impl From for SchemaIdJson { - fn from(id: SchemaId) -> SchemaIdJson { - SchemaIdJson(id.to_string()) - } -} - -impl TryFrom for SchemaId { - type Error = PlanError; - - fn try_from(value: SchemaIdJson) -> Result { - SchemaId::from_str(&value.0) - } -} - impl fmt::Display for SchemaId { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { @@ -684,36 +659,11 @@ impl FromStr for SchemaId { /// The identifier for a database. #[derive(Clone, Copy, Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Serialize, Deserialize)] -#[serde(into = "DatabaseIdJson")] -#[serde(try_from = "DatabaseIdJson")] pub enum DatabaseId { User(u64), System(u64), } -/// A type that specifies how we serialize and deserialize a [`DatabaseId`]. -/// -/// Note: JSON maps require their keys to be strings, and we use [`DatabaseId`] as the key -/// in some maps, so we need to serialize them as plain strings, which is what this type -/// does. -#[derive(Clone, Debug, Serialize, Deserialize)] -#[serde(transparent)] -struct DatabaseIdJson(String); - -impl From for DatabaseIdJson { - fn from(id: DatabaseId) -> DatabaseIdJson { - DatabaseIdJson(id.to_string()) - } -} - -impl TryFrom for DatabaseId { - type Error = PlanError; - - fn try_from(value: DatabaseIdJson) -> Result { - DatabaseId::from_str(&value.0) - } -} - impl fmt::Display for DatabaseId { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { match self { From efdcdd782f32f6992b5730302e0b5ec91c0864ea Mon Sep 17 00:00:00 2001 From: Parker Timmerman <126496446+parker-timmerman@users.noreply.github.com> Date: Mon, 17 Apr 2023 08:50:49 -0400 Subject: [PATCH 5/5] Update src/stash/src/lib.rs Co-authored-by: Matt Jibson --- src/stash/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stash/src/lib.rs b/src/stash/src/lib.rs index 6bf4ce9e158f..410f3ef6eba8 100644 --- a/src/stash/src/lib.rs +++ b/src/stash/src/lib.rs @@ -945,7 +945,7 @@ where } } - /// Migrates k, v pairs. `f` is a function that can return `Some((K, V))` if the value should + /// Migrates k, v pairs. `f` is a function that can return `Some((K, V))` if the key and value should /// be updated, otherwise `None`. Returns the number of changed entries. /// /// Returns an error if the uniqueness check failed.