diff --git a/src/meta/api/src/schema_api_test_suite.rs b/src/meta/api/src/schema_api_test_suite.rs index 89f73a331b8d4..8cc2523c5189a 100644 --- a/src/meta/api/src/schema_api_test_suite.rs +++ b/src/meta/api/src/schema_api_test_suite.rs @@ -115,6 +115,7 @@ use databend_common_meta_app::schema::SetTableColumnMaskPolicyAction; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq; use databend_common_meta_app::schema::SetTableRowAccessPolicyAction; use databend_common_meta_app::schema::SetTableRowAccessPolicyReq; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableCopiedFileInfo; use databend_common_meta_app::schema::TableCopiedFileNameIdent; use databend_common_meta_app::schema::TableId; @@ -309,6 +310,7 @@ impl SchemaApiTestSuite { .drop_table_without_table_id_list(&b.build().await) .await?; suite.table_rename(&b.build().await).await?; + suite.table_swap(&b.build().await).await?; suite.table_update_meta(&b.build().await).await?; suite.table_update_mask_policy(&b.build().await).await?; suite @@ -2367,6 +2369,179 @@ impl SchemaApiTestSuite { Ok(()) } + #[fastrace::trace] + async fn table_swap>( + &self, + mt: &MT, + ) -> anyhow::Result<()> { + let tenant_name = "tenant1"; + let tenant = Tenant::new_or_err(tenant_name, func_name!())?; + + let db1_name = "db1"; + let tb1_name = "table_a"; + let tb2_name = "table_b"; + + let schema = || { + Arc::new(TableSchema::new(vec![TableField::new( + "number", + TableDataType::Number(NumberDataType::UInt64), + )])) + }; + + let _table_meta = |created_on| TableMeta { + schema: schema(), + engine: "JSON".to_string(), + options: maplit::btreemap! {"opt-1".into() => "val-1".into()}, + created_on, + ..TableMeta::default() + }; + + let swap_req = |if_exists| SwapTableReq { + if_exists, + origin_table: TableNameIdent { + tenant: tenant.clone(), + db_name: db1_name.to_string(), + table_name: tb1_name.to_string(), + }, + target_table_name: tb2_name.to_string(), + }; + + info!("--- swap tables on unknown db"); + { + let got = mt.swap_table(swap_req(false)).await; + debug!("--- swap tables on unknown database got: {:?}", got); + + assert!(got.is_err()); + assert_eq!( + ErrorCode::UNKNOWN_DATABASE, + ErrorCode::from(got.unwrap_err()).code() + ); + } + + info!("--- prepare db and tables"); + let created_on = Utc::now(); + let mut util1 = Util::new(mt, tenant_name, db1_name, tb1_name, "JSON"); + util1.create_db().await?; + + let mut util2 = Util::new(mt, tenant_name, db1_name, tb2_name, "JSON"); + + info!("--- create table_a"); + let tb1_ident = { + let old_db = util1.get_database().await?; + let (table_id, _table_meta) = util1 + .create_table_with( + |mut meta| { + meta.schema = schema(); + meta.created_on = created_on; + meta + }, + |req| req, + ) + .await?; + let cur_db = util1.get_database().await?; + assert!(old_db.meta.seq < cur_db.meta.seq); + let got = util1.get_table().await?; + assert_eq!(table_id, got.ident.table_id); + got.ident + }; + + info!("--- create table_b"); + let tb2_ident = { + let old_db = util2.get_database().await?; + let (table_id, _table_meta) = util2 + .create_table_with( + |mut meta| { + meta.schema = schema(); + meta.created_on = created_on; + meta + }, + |req| req, + ) + .await?; + let cur_db = util2.get_database().await?; + assert!(old_db.meta.seq < cur_db.meta.seq); + let got = util2.get_table().await?; + assert_eq!(table_id, got.ident.table_id); + got.ident + }; + + info!("--- swap tables, both exist, ok"); + { + let old_db = util1.get_database().await?; + let _reply = mt.swap_table(swap_req(false)).await?; + let cur_db = util1.get_database().await?; + assert!(old_db.meta.seq < cur_db.meta.seq); + + // Verify tables are swapped + let got_a = mt + .get_table((tenant_name, db1_name, tb1_name).into()) + .await?; + let got_b = mt + .get_table((tenant_name, db1_name, tb2_name).into()) + .await?; + + // table_a name should now point to table_b's id + assert_eq!(tb2_ident.table_id, got_a.ident.table_id); + // table_b name should now point to table_a's id + assert_eq!(tb1_ident.table_id, got_b.ident.table_id); + } + + info!("--- swap tables again, should restore original mapping"); + { + let _reply = mt.swap_table(swap_req(false)).await?; + + // Verify tables are swapped back + let got_a = mt + .get_table((tenant_name, db1_name, tb1_name).into()) + .await?; + let got_b = mt + .get_table((tenant_name, db1_name, tb2_name).into()) + .await?; + + // table_a name should point back to table_a's id + assert_eq!(tb1_ident.table_id, got_a.ident.table_id); + // table_b name should point back to table_b's id + assert_eq!(tb2_ident.table_id, got_b.ident.table_id); + } + + info!("--- swap non-existent table with if_exists=false, error"); + { + let swap_req_nonexist = SwapTableReq { + if_exists: false, + origin_table: TableNameIdent { + tenant: tenant.clone(), + db_name: db1_name.to_string(), + table_name: "non_existent".to_string(), + }, + target_table_name: tb2_name.to_string(), + }; + + let got = mt.swap_table(swap_req_nonexist).await; + assert!(got.is_err()); + assert_eq!( + ErrorCode::UNKNOWN_TABLE, + ErrorCode::from(got.unwrap_err()).code() + ); + } + + info!("--- swap non-existent table with if_exists=true, ok"); + { + let swap_req_nonexist = SwapTableReq { + if_exists: true, + origin_table: TableNameIdent { + tenant: tenant.clone(), + db_name: db1_name.to_string(), + table_name: "non_existent".to_string(), + }, + target_table_name: tb2_name.to_string(), + }; + + assert!(mt.swap_table(swap_req_nonexist).await.is_ok()); + } + + Ok(()) + } + #[fastrace::trace] async fn table_update_meta>( &self, diff --git a/src/meta/api/src/table_api.rs b/src/meta/api/src/table_api.rs index 066defcb48b43..76fa26188d55a 100644 --- a/src/meta/api/src/table_api.rs +++ b/src/meta/api/src/table_api.rs @@ -69,6 +69,8 @@ use databend_common_meta_app::schema::ListDroppedTableResp; use databend_common_meta_app::schema::ListTableReq; use databend_common_meta_app::schema::RenameTableReply; use databend_common_meta_app::schema::RenameTableReq; +use databend_common_meta_app::schema::SwapTableReply; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableCopiedFileNameIdent; use databend_common_meta_app::schema::TableId; use databend_common_meta_app::schema::TableIdHistoryIdent; @@ -713,6 +715,192 @@ where } } + #[logcall::logcall] + #[fastrace::trace] + async fn swap_table(&self, req: SwapTableReq) -> Result { + debug!(req :? =(&req); "SchemaApi: {}", func_name!()); + + let mut trials = txn_backoff(None, func_name!()); + loop { + trials.next().unwrap()?.await; + + // Get databases + let tenant_dbname_left = req.origin_table.db_name_ident(); + + let (seq_db_id_left, db_meta_left) = + get_db_or_err(self, &tenant_dbname_left, "swap_table: tenant_dbname_left").await?; + + let dbid_tbname_left = DBIdTableName { + db_id: *seq_db_id_left.data, + table_name: req.origin_table.table_name.clone(), + }; + + let (tb_id_seq_left, table_id_left) = get_u64_value(self, &dbid_tbname_left).await?; + if req.if_exists && tb_id_seq_left == 0 { + return Ok(SwapTableReply {}); + } + assert_table_exist( + tb_id_seq_left, + &req.origin_table, + "swap_table: origin_table", + )?; + + let dbid_tbname_right = DBIdTableName { + db_id: *seq_db_id_left.data, + table_name: req.target_table_name.clone(), + }; + + let (tb_id_seq_right, table_id_right) = get_u64_value(self, &dbid_tbname_right).await?; + if req.if_exists && tb_id_seq_right == 0 { + return Ok(SwapTableReply {}); + } + assert_table_exist( + tb_id_seq_right, + &TableNameIdent { + tenant: req.origin_table.tenant.clone(), + db_name: req.origin_table.db_name.clone(), + table_name: req.target_table_name.clone(), + }, + "swap_table: target_table", + )?; + + // Get table id lists + let dbid_tbname_idlist_left = TableIdHistoryIdent { + database_id: *seq_db_id_left.data, + table_name: req.origin_table.table_name.clone(), + }; + let dbid_tbname_idlist_right = TableIdHistoryIdent { + database_id: *seq_db_id_left.data, + table_name: req.target_table_name.clone(), + }; + + let seq_table_history_left = self.get_pb(&dbid_tbname_idlist_left).await?; + let seq_table_history_right = self.get_pb(&dbid_tbname_idlist_right).await?; + + let tb_id_list_seq_left = seq_table_history_left.seq(); + let tb_id_list_seq_right = seq_table_history_right.seq(); + + let mut tb_id_list_left = seq_table_history_left + .into_value() + .unwrap_or_else(|| TableIdList::new_with_ids([table_id_left])); + let mut tb_id_list_right = seq_table_history_right + .into_value() + .unwrap_or_else(|| TableIdList::new_with_ids([table_id_right])); + + // Validate table IDs in history lists + { + let last_left = tb_id_list_left.last().copied(); + if Some(table_id_left) != last_left { + let err_message = format!( + "swap_table {:?} but last table id conflict, id list last: {:?}, current: {}", + req.origin_table, last_left, table_id_left + ); + error!("{}", err_message); + return Err(KVAppError::AppError(AppError::UnknownTable( + UnknownTable::new(&req.origin_table.table_name, err_message), + ))); + } + + let last_right = tb_id_list_right.last().copied(); + if Some(table_id_right) != last_right { + let err_message = format!( + "swap_table {:?} but last table id conflict, id list last: {:?}, current: {}", + req.target_table_name, last_right, table_id_right + ); + error!("{}", err_message); + return Err(KVAppError::AppError(AppError::UnknownTable( + UnknownTable::new(&req.target_table_name, err_message), + ))); + } + } + + // Get table id to name mappings + let table_id_to_name_key_left = TableIdToName { + table_id: table_id_left, + }; + let table_id_to_name_key_right = TableIdToName { + table_id: table_id_right, + }; + let table_id_to_name_seq_left = self.get_seq(&table_id_to_name_key_left).await?; + let table_id_to_name_seq_right = self.get_seq(&table_id_to_name_key_right).await?; + + // Prepare new mappings after swap + let db_id_table_name_left = DBIdTableName { + db_id: *seq_db_id_left.data, + table_name: req.origin_table.table_name.clone(), + }; + let db_id_table_name_right = DBIdTableName { + db_id: *seq_db_id_left.data, + table_name: req.target_table_name.clone(), + }; + + { + // Update history lists: remove current table IDs + tb_id_list_left.pop(); + tb_id_list_right.pop(); + // Add swapped table IDs + tb_id_list_left.append(table_id_right); + tb_id_list_right.append(table_id_left); + + let txn = TxnRequest::new( + vec![ + // Ensure databases haven't changed + txn_cond_seq(&seq_db_id_left.data, Eq, db_meta_left.seq), + // Ensure table name->table_id mappings haven't changed + txn_cond_seq(&dbid_tbname_left, Eq, tb_id_seq_left), + txn_cond_seq(&dbid_tbname_right, Eq, tb_id_seq_right), + // Ensure table history lists haven't changed + txn_cond_seq(&dbid_tbname_idlist_left, Eq, tb_id_list_seq_left), + txn_cond_seq(&dbid_tbname_idlist_right, Eq, tb_id_list_seq_right), + // Ensure table_id->name mappings haven't changed + txn_cond_seq(&table_id_to_name_key_left, Eq, table_id_to_name_seq_left), + txn_cond_seq(&table_id_to_name_key_right, Eq, table_id_to_name_seq_right), + ], + vec![ + // Swap table name->table_id mappings + txn_op_put(&dbid_tbname_left, serialize_u64(table_id_right)?), /* origin_table_name -> target_table_id */ + txn_op_put(&dbid_tbname_right, serialize_u64(table_id_left)?), /* target_table_name -> origin_table_id */ + // Update database metadata sequences + txn_op_put(&seq_db_id_left.data, serialize_struct(&*db_meta_left)?), + // Update table history lists + txn_op_put( + &dbid_tbname_idlist_left, + serialize_struct(&tb_id_list_left)?, + ), + txn_op_put( + &dbid_tbname_idlist_right, + serialize_struct(&tb_id_list_right)?, + ), + // Update table_id->name mappings + txn_op_put( + &table_id_to_name_key_left, + serialize_struct(&db_id_table_name_right)?, + ), // origin_table_id -> target_table_name + txn_op_put( + &table_id_to_name_key_right, + serialize_struct(&db_id_table_name_left)?, + ), // target_table_id -> origin_table_name + ], + ); + + let (succ, _responses) = send_txn(self, txn).await?; + + debug!( + origin_table :? =(&req.origin_table), + target_table_name :? =(&req.target_table_name), + table_id_left :? =(&table_id_left), + table_id_right :? =(&table_id_right), + succ = succ; + "swap_table" + ); + + if succ { + return Ok(SwapTableReply {}); + } + } + } + } + #[logcall::logcall] #[fastrace::trace] async fn truncate_table( diff --git a/src/meta/app/src/schema/mod.rs b/src/meta/app/src/schema/mod.rs index 7f5b95ca16506..2c75f1bef89c1 100644 --- a/src/meta/app/src/schema/mod.rs +++ b/src/meta/app/src/schema/mod.rs @@ -116,6 +116,8 @@ pub use table::SetTableColumnMaskPolicyReq; pub use table::SetTableRowAccessPolicyAction; pub use table::SetTableRowAccessPolicyReply; pub use table::SetTableRowAccessPolicyReq; +pub use table::SwapTableReply; +pub use table::SwapTableReq; pub use table::TableCopiedFileInfo; pub use table::TableCopiedFileNameIdent; pub use table::TableId; diff --git a/src/meta/app/src/schema/table.rs b/src/meta/app/src/schema/table.rs index 800ee0165dc6a..2b9a872d8c458 100644 --- a/src/meta/app/src/schema/table.rs +++ b/src/meta/app/src/schema/table.rs @@ -816,6 +816,21 @@ impl RenameTableReq { } } +impl Display for SwapTableReq { + fn fmt(&self, f: &mut Formatter) -> fmt::Result { + write!( + f, + "swap_table:{}/{}-{}<==>{}/{}-{}", + self.origin_table.tenant.tenant_name(), + self.origin_table.db_name, + self.origin_table.table_name, + self.origin_table.tenant.tenant_name(), + self.origin_table.db_name, + self.target_table_name + ) + } +} + impl Display for RenameTableReq { fn fmt(&self, f: &mut Formatter) -> fmt::Result { write!( @@ -835,6 +850,23 @@ pub struct RenameTableReply { pub table_id: u64, } +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct SwapTableReq { + pub if_exists: bool, + pub origin_table: TableNameIdent, + pub target_table_name: String, +} + +impl SwapTableReq { + pub fn tenant(&self) -> &Tenant { + &self.origin_table.tenant + } +} + +// Keep this structure for future compatibility, even if currently empty. +#[derive(Clone, Debug, PartialEq, Eq)] +pub struct SwapTableReply {} + #[derive(Clone, Debug, PartialEq, Eq)] pub struct UpsertTableOptionReq { pub table_id: u64, diff --git a/src/query/ast/src/ast/statements/table.rs b/src/query/ast/src/ast/statements/table.rs index 4e7f8fa8e8684..3e887fb29af12 100644 --- a/src/query/ast/src/ast/statements/table.rs +++ b/src/query/ast/src/ast/statements/table.rs @@ -439,6 +439,9 @@ pub enum AlterTableAction { RenameTable { new_table: Identifier, }, + SwapWith { + target_table: Identifier, + }, AddColumn { column: ColumnDefinition, option: AddColumnOption, @@ -508,6 +511,9 @@ impl Display for AlterTableAction { AlterTableAction::RenameTable { new_table } => { write!(f, "RENAME TO {new_table}")?; } + AlterTableAction::SwapWith { target_table } => { + write!(f, "SWAP WITH {target_table}")?; + } AlterTableAction::ModifyTableComment { new_comment } => { write!(f, "COMMENT={}", QuotedString(new_comment, '\''))?; } diff --git a/src/query/ast/src/parser/statement.rs b/src/query/ast/src/parser/statement.rs index b5268ec985362..79b428a10a3af 100644 --- a/src/query/ast/src/parser/statement.rs +++ b/src/query/ast/src/parser/statement.rs @@ -4098,6 +4098,12 @@ pub fn alter_table_action(i: Input) -> IResult { }, |(_, _, new_table)| AlterTableAction::RenameTable { new_table }, ); + let swap_with = map( + rule! { + SWAP ~ WITH ~ #ident + }, + |(_, _, target_table)| AlterTableAction::SwapWith { target_table }, + ); let rename_column = map( rule! { RENAME ~ COLUMN? ~ #ident ~ TO ~ #ident @@ -4246,6 +4252,7 @@ pub fn alter_table_action(i: Input) -> IResult { | #drop_table_cluster_key | #drop_constraint | #rename_table + | #swap_with | #rename_column | #modify_table_comment | #add_column diff --git a/src/query/ast/src/parser/token.rs b/src/query/ast/src/parser/token.rs index 4d5842aa83405..37e8f628b0468 100644 --- a/src/query/ast/src/parser/token.rs +++ b/src/query/ast/src/parser/token.rs @@ -1168,6 +1168,8 @@ pub enum TokenKind { SHARES, #[token("SUPER", ignore(ascii_case))] SUPER, + #[token("SWAP", ignore(ascii_case))] + SWAP, #[token("STATUS", ignore(ascii_case))] STATUS, #[token("STORED", ignore(ascii_case))] diff --git a/src/query/ast/tests/it/parser.rs b/src/query/ast/tests/it/parser.rs index c13fb31166014..10a74a44f95c3 100644 --- a/src/query/ast/tests/it/parser.rs +++ b/src/query/ast/tests/it/parser.rs @@ -295,6 +295,7 @@ SELECT * from s;"#, r#"OPTIMIZE TABLE t PURGE BEFORE (SNAPSHOT => '9828b23f74664ff3806f44bbc1925ea5') LIMIT 10;"#, r#"OPTIMIZE TABLE t PURGE BEFORE (TIMESTAMP => '2023-06-26 09:49:02.038483'::TIMESTAMP) LIMIT 10;"#, r#"ALTER TABLE t CLUSTER BY(c1);"#, + r#"ALTER TABLE t1 swap with t2;"#, r#"ALTER TABLE t refresh cache;"#, r#"ALTER TABLE t COMMENT='t1-commnet';"#, r#"ALTER TABLE t DROP CLUSTER KEY;"#, diff --git a/src/query/ast/tests/it/testdata/stmt.txt b/src/query/ast/tests/it/testdata/stmt.txt index 83aed60f959e9..e202d3eb23d78 100644 --- a/src/query/ast/tests/it/testdata/stmt.txt +++ b/src/query/ast/tests/it/testdata/stmt.txt @@ -13687,6 +13687,49 @@ AlterTable( ) +---------- Input ---------- +ALTER TABLE t1 swap with t2; +---------- Output --------- +ALTER TABLE t1 SWAP WITH t2 +---------- AST ------------ +AlterTable( + AlterTableStmt { + if_exists: false, + table_reference: Table { + span: Some( + 12..14, + ), + catalog: None, + database: None, + table: Identifier { + span: Some( + 12..14, + ), + name: "t1", + quote: None, + ident_type: None, + }, + alias: None, + temporal: None, + with_options: None, + pivot: None, + unpivot: None, + sample: None, + }, + action: SwapWith { + target_table: Identifier { + span: Some( + 25..27, + ), + name: "t2", + quote: None, + ident_type: None, + }, + }, + }, +) + + ---------- Input ---------- ALTER TABLE t refresh cache; ---------- Output --------- diff --git a/src/query/catalog/src/catalog/interface.rs b/src/query/catalog/src/catalog/interface.rs index bf7ba22394167..7ab0219453b75 100644 --- a/src/query/catalog/src/catalog/interface.rs +++ b/src/query/catalog/src/catalog/interface.rs @@ -86,6 +86,8 @@ use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq; use databend_common_meta_app::schema::SetTableRowAccessPolicyReply; use databend_common_meta_app::schema::SetTableRowAccessPolicyReq; +use databend_common_meta_app::schema::SwapTableReply; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::schema::TruncateTableReply; @@ -344,6 +346,8 @@ pub trait Catalog: DynClone + Send + Sync + Debug { async fn rename_table(&self, req: RenameTableReq) -> Result; + async fn swap_table(&self, req: SwapTableReq) -> Result; + // Check a db.table is exists or not. #[async_backtrace::framed] async fn exists_table(&self, tenant: &Tenant, db_name: &str, table_name: &str) -> Result { diff --git a/src/query/catalog/src/database.rs b/src/query/catalog/src/database.rs index 768f065af8dac..049ecf8f19699 100644 --- a/src/query/catalog/src/database.rs +++ b/src/query/catalog/src/database.rs @@ -30,6 +30,8 @@ use databend_common_meta_app::schema::RenameTableReply; use databend_common_meta_app::schema::RenameTableReq; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq; +use databend_common_meta_app::schema::SwapTableReply; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TruncateTableReply; use databend_common_meta_app::schema::TruncateTableReq; @@ -176,6 +178,14 @@ pub trait Database: DynClone + Sync + Send { ))) } + #[async_backtrace::framed] + async fn swap_table(&self, _req: SwapTableReq) -> Result { + Err(ErrorCode::Unimplemented(format!( + "UnImplement swap_table in {} Database", + self.name() + ))) + } + #[async_backtrace::framed] async fn upsert_table_option( &self, diff --git a/src/query/service/src/catalogs/default/database_catalog.rs b/src/query/service/src/catalogs/default/database_catalog.rs index 68724d2b5c619..24191e2e5857a 100644 --- a/src/query/service/src/catalogs/default/database_catalog.rs +++ b/src/query/service/src/catalogs/default/database_catalog.rs @@ -91,6 +91,8 @@ use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq; use databend_common_meta_app::schema::SetTableRowAccessPolicyReply; use databend_common_meta_app::schema::SetTableRowAccessPolicyReq; +use databend_common_meta_app::schema::SwapTableReply; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::schema::TruncateTableReply; @@ -553,6 +555,23 @@ impl Catalog for DatabaseCatalog { self.mutable_catalog.rename_table(req).await } + #[async_backtrace::framed] + async fn swap_table(&self, req: SwapTableReq) -> Result { + info!("Swap table from req:{:?}", req); + + if self + .immutable_catalog + .exists_database(req.tenant(), &req.origin_table.db_name) + .await? + { + return Err(ErrorCode::Unimplemented( + "Cannot swap tables from(to) system databases", + )); + } + + self.mutable_catalog.swap_table(req).await + } + #[async_backtrace::framed] async fn create_table_index(&self, req: CreateTableIndexReq) -> Result<()> { self.mutable_catalog.create_table_index(req).await diff --git a/src/query/service/src/catalogs/default/immutable_catalog.rs b/src/query/service/src/catalogs/default/immutable_catalog.rs index ebe1f3cf774da..8dd2c2df88cbc 100644 --- a/src/query/service/src/catalogs/default/immutable_catalog.rs +++ b/src/query/service/src/catalogs/default/immutable_catalog.rs @@ -78,6 +78,8 @@ use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq; use databend_common_meta_app::schema::SetTableRowAccessPolicyReply; use databend_common_meta_app::schema::SetTableRowAccessPolicyReq; +use databend_common_meta_app::schema::SwapTableReply; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::schema::TruncateTableReply; @@ -382,6 +384,13 @@ impl Catalog for ImmutableCatalog { )) } + #[async_backtrace::framed] + async fn swap_table(&self, _req: SwapTableReq) -> Result { + Err(ErrorCode::Unimplemented( + "Cannot swap table in system database", + )) + } + async fn commit_table_meta(&self, _req: CommitTableMetaReq) -> Result { Err(ErrorCode::Unimplemented( "cannot commit_table_meta in system database", diff --git a/src/query/service/src/catalogs/default/mutable_catalog.rs b/src/query/service/src/catalogs/default/mutable_catalog.rs index 18046450f8e2f..b2fbb6ba97f11 100644 --- a/src/query/service/src/catalogs/default/mutable_catalog.rs +++ b/src/query/service/src/catalogs/default/mutable_catalog.rs @@ -106,6 +106,8 @@ use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq; use databend_common_meta_app::schema::SetTableRowAccessPolicyReply; use databend_common_meta_app::schema::SetTableRowAccessPolicyReq; +use databend_common_meta_app::schema::SwapTableReply; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; @@ -687,6 +689,14 @@ impl Catalog for MutableCatalog { db.rename_table(req).await } + #[async_backtrace::framed] + async fn swap_table(&self, req: SwapTableReq) -> Result { + let db = self + .get_database(&req.origin_table.tenant, &req.origin_table.db_name) + .await?; + db.swap_table(req).await + } + #[async_backtrace::framed] async fn upsert_table_option( &self, diff --git a/src/query/service/src/catalogs/default/session_catalog.rs b/src/query/service/src/catalogs/default/session_catalog.rs index c387aa7ad0997..3f7b63d4f6ea2 100644 --- a/src/query/service/src/catalogs/default/session_catalog.rs +++ b/src/query/service/src/catalogs/default/session_catalog.rs @@ -87,6 +87,8 @@ use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq; use databend_common_meta_app::schema::SetTableRowAccessPolicyReply; use databend_common_meta_app::schema::SetTableRowAccessPolicyReq; +use databend_common_meta_app::schema::SwapTableReply; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::schema::TruncateTableReply; @@ -470,6 +472,10 @@ impl Catalog for SessionCatalog { } } + async fn swap_table(&self, req: SwapTableReq) -> Result { + self.inner.swap_table(req).await + } + async fn upsert_table_option( &self, tenant: &Tenant, diff --git a/src/query/service/src/catalogs/iceberg/iceberg_catalog.rs b/src/query/service/src/catalogs/iceberg/iceberg_catalog.rs index 2caf122b71404..6132de1257a40 100644 --- a/src/query/service/src/catalogs/iceberg/iceberg_catalog.rs +++ b/src/query/service/src/catalogs/iceberg/iceberg_catalog.rs @@ -77,6 +77,8 @@ use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq; use databend_common_meta_app::schema::SetTableRowAccessPolicyReply; use databend_common_meta_app::schema::SetTableRowAccessPolicyReq; +use databend_common_meta_app::schema::SwapTableReply; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::schema::TruncateTableReply; @@ -433,6 +435,13 @@ impl Catalog for IcebergCatalog { self.iceberg_catalog.rename_table(req).await } + #[async_backtrace::framed] + async fn swap_table(&self, _req: SwapTableReq) -> Result { + Err(ErrorCode::Unimplemented( + "Cannot swap table in iceberg catalog", + )) + } + #[async_backtrace::framed] async fn upsert_table_option( &self, diff --git a/src/query/service/src/databases/default/default_database.rs b/src/query/service/src/databases/default/default_database.rs index c5c6b3f7168c8..8d3fecd05eeb9 100644 --- a/src/query/service/src/databases/default/default_database.rs +++ b/src/query/service/src/databases/default/default_database.rs @@ -37,6 +37,8 @@ use databend_common_meta_app::schema::RenameTableReply; use databend_common_meta_app::schema::RenameTableReq; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq; +use databend_common_meta_app::schema::SwapTableReply; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableIdHistoryIdent; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; @@ -259,6 +261,12 @@ impl Database for DefaultDatabase { Ok(res) } + #[async_backtrace::framed] + async fn swap_table(&self, req: SwapTableReq) -> Result { + let res = self.ctx.meta.swap_table(req).await?; + Ok(res) + } + #[async_backtrace::framed] async fn upsert_table_option( &self, diff --git a/src/query/service/src/interpreters/access/privilege_access.rs b/src/query/service/src/interpreters/access/privilege_access.rs index c839d99b832fe..7b828211f5b8c 100644 --- a/src/query/service/src/interpreters/access/privilege_access.rs +++ b/src/query/service/src/interpreters/access/privilege_access.rs @@ -1138,6 +1138,17 @@ impl AccessChecker for PrivilegeAccess { } self.validate_db_access(&plan.catalog, &plan.new_database, UserPrivilegeType::Create, false).await?; } + Plan::SwapTable(plan) => { + // only the current role have OWNERSHIP privileges on the tables can execute swap. + let session = self.ctx.get_current_session(); + let origin_table_owner = self.has_ownership(&session, &GrantObject::Table(plan.catalog.clone(), plan.database.clone(), plan.table.clone()), true, false).await?; + let target_table_owner = self.has_ownership(&session, &GrantObject::Table(plan.catalog.clone(), plan.database.clone(), plan.target_table.clone()), true, false).await?; + return if target_table_owner && origin_table_owner { + Ok(()) + } else { + Err(ErrorCode::PermissionDenied("Insufficient privileges: only the table owner can perform this operation")) + } + } Plan::SetOptions(plan) => { self.validate_table_access(&plan.catalog, &plan.database, &plan.table, UserPrivilegeType::Alter, false, false).await? } diff --git a/src/query/service/src/interpreters/interpreter_factory.rs b/src/query/service/src/interpreters/interpreter_factory.rs index 4e2cc282cec8d..1dedccb3d4c25 100644 --- a/src/query/service/src/interpreters/interpreter_factory.rs +++ b/src/query/service/src/interpreters/interpreter_factory.rs @@ -343,6 +343,10 @@ impl InterpreterFactory { ctx, *rename_table.clone(), )?)), + Plan::SwapTable(swap_table) => Ok(Arc::new(SwapTableInterpreter::try_create( + ctx, + *swap_table.clone(), + )?)), Plan::SetOptions(set_options) => Ok(Arc::new(SetOptionsInterpreter::try_create( ctx, *set_options.clone(), diff --git a/src/query/service/src/interpreters/interpreter_table_swap.rs b/src/query/service/src/interpreters/interpreter_table_swap.rs new file mode 100644 index 0000000000000..5b8e7d3cf7554 --- /dev/null +++ b/src/query/service/src/interpreters/interpreter_table_swap.rs @@ -0,0 +1,77 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_meta_app::schema::SwapTableReq; +use databend_common_meta_app::schema::TableNameIdent; +use databend_common_sql::plans::SwapTablePlan; + +use crate::interpreters::Interpreter; +use crate::pipelines::PipelineBuildResult; +use crate::sessions::QueryContext; +use crate::sessions::TableContext; + +pub struct SwapTableInterpreter { + ctx: Arc, + plan: SwapTablePlan, +} + +impl SwapTableInterpreter { + pub fn try_create(ctx: Arc, plan: SwapTablePlan) -> Result { + Ok(SwapTableInterpreter { ctx, plan }) + } +} + +#[async_trait::async_trait] +impl Interpreter for SwapTableInterpreter { + fn name(&self) -> &str { + "SwapTableInterpreter" + } + + fn is_ddl(&self) -> bool { + true + } + + #[async_backtrace::framed] + async fn execute2(&self) -> Result { + let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; + let tenant = self.plan.tenant.clone(); + let db_name = self.plan.database.clone(); + let table_name = self.plan.table.clone(); + let target_table_name = self.plan.target_table.clone(); + let origin_table = catalog.get_table(&tenant, &db_name, &table_name).await?; + let target_table = catalog + .get_table(&tenant, &db_name, &target_table_name) + .await?; + if origin_table.is_temp() || target_table.is_temp() { + return Err(ErrorCode::AlterTableError("Can not swap temp table")); + } + let _resp = catalog + .swap_table(SwapTableReq { + if_exists: self.plan.if_exists, + origin_table: TableNameIdent { + tenant: self.plan.tenant.clone(), + db_name: self.plan.database.clone(), + table_name: self.plan.table.clone(), + }, + target_table_name: self.plan.target_table.clone(), + }) + .await?; + + Ok(PipelineBuildResult::create()) + } +} diff --git a/src/query/service/src/interpreters/mod.rs b/src/query/service/src/interpreters/mod.rs index 968ebd9ac3410..4fb7a370e6bdc 100644 --- a/src/query/service/src/interpreters/mod.rs +++ b/src/query/service/src/interpreters/mod.rs @@ -144,6 +144,7 @@ mod interpreter_table_row_access_drop; mod interpreter_table_row_access_drop_all; mod interpreter_table_set_options; mod interpreter_table_show_create; +mod interpreter_table_swap; mod interpreter_table_truncate; mod interpreter_table_undrop; mod interpreter_table_unset_options; @@ -261,6 +262,7 @@ pub use interpreter_table_row_access_drop::DropTableRowAccessPolicyInterpreter; pub use interpreter_table_row_access_drop_all::DropAllTableRowAccessPoliciesInterpreter; pub use interpreter_table_show_create::ShowCreateQuerySettings; pub use interpreter_table_show_create::ShowCreateTableInterpreter; +pub use interpreter_table_swap::SwapTableInterpreter; pub use interpreter_table_truncate::TruncateTableInterpreter; pub use interpreter_table_undrop::UndropTableInterpreter; pub use interpreter_table_vacuum::VacuumTableInterpreter; diff --git a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs index 3307f22e81912..931f9cdf7a4b8 100644 --- a/src/query/service/tests/it/sql/exec/get_table_bind_test.rs +++ b/src/query/service/tests/it/sql/exec/get_table_bind_test.rs @@ -113,6 +113,8 @@ use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq; use databend_common_meta_app::schema::SetTableRowAccessPolicyReply; use databend_common_meta_app::schema::SetTableRowAccessPolicyReq; +use databend_common_meta_app::schema::SwapTableReply; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::schema::TruncateTableReply; @@ -299,6 +301,10 @@ impl Catalog for FakedCatalog { todo!() } + async fn swap_table(&self, _req: SwapTableReq) -> Result { + todo!() + } + async fn upsert_table_option( &self, _tenant: &Tenant, diff --git a/src/query/service/tests/it/storages/fuse/operations/commit.rs b/src/query/service/tests/it/storages/fuse/operations/commit.rs index f3c2bfcc0d3c4..db747b6b20e4d 100644 --- a/src/query/service/tests/it/storages/fuse/operations/commit.rs +++ b/src/query/service/tests/it/storages/fuse/operations/commit.rs @@ -109,6 +109,8 @@ use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq; use databend_common_meta_app::schema::SetTableRowAccessPolicyReply; use databend_common_meta_app::schema::SetTableRowAccessPolicyReq; +use databend_common_meta_app::schema::SwapTableReply; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::schema::TruncateTableReply; @@ -1059,6 +1061,10 @@ impl Catalog for FakedCatalog { todo!() } + async fn swap_table(&self, _req: SwapTableReq) -> Result { + todo!() + } + async fn upsert_table_option( &self, _tenant: &Tenant, diff --git a/src/query/sql/src/planner/binder/ddl/table.rs b/src/query/sql/src/planner/binder/ddl/table.rs index 453d77ec18504..24ecac399bac5 100644 --- a/src/query/sql/src/planner/binder/ddl/table.rs +++ b/src/query/sql/src/planner/binder/ddl/table.rs @@ -151,6 +151,7 @@ use crate::plans::RevertTablePlan; use crate::plans::RewriteKind; use crate::plans::SetOptionsPlan; use crate::plans::ShowCreateTablePlan; +use crate::plans::SwapTablePlan; use crate::plans::TruncateTablePlan; use crate::plans::UndropTablePlan; use crate::plans::UnsetOptionsPlan; @@ -1043,6 +1044,17 @@ impl Binder { table, }))) } + AlterTableAction::SwapWith { target_table } => { + Ok(Plan::SwapTable(Box::new(SwapTablePlan { + tenant, + if_exists: *if_exists, + catalog: catalog.clone(), + database: database.clone(), + table: table.clone(), + target_table: normalize_identifier(target_table, &self.name_resolution_ctx) + .name, + }))) + } AlterTableAction::ModifyTableComment { new_comment } => { Ok(Plan::ModifyTableComment(Box::new(ModifyTableCommentPlan { if_exists: *if_exists, diff --git a/src/query/sql/src/planner/format/display_plan.rs b/src/query/sql/src/planner/format/display_plan.rs index a5c4c7b451e4b..0be5dbcac16b0 100644 --- a/src/query/sql/src/planner/format/display_plan.rs +++ b/src/query/sql/src/planner/format/display_plan.rs @@ -81,6 +81,7 @@ impl Plan { Plan::UndropTable(_) => Ok("UndropTable".to_string()), Plan::DescribeTable(_) => Ok("DescribeTable".to_string()), Plan::RenameTable(_) => Ok("RenameTable".to_string()), + Plan::SwapTable(_) => Ok("SwapTable".to_string()), Plan::ModifyTableComment(_) => Ok("ModifyTableComment".to_string()), Plan::ModifyTableConnection(_) => Ok("ModifyTableConnection".to_string()), Plan::SetOptions(_) => Ok("SetOptions".to_string()), diff --git a/src/query/sql/src/planner/plans/ddl/table.rs b/src/query/sql/src/planner/plans/ddl/table.rs index ef024a42a84bc..c27ce262b092f 100644 --- a/src/query/sql/src/planner/plans/ddl/table.rs +++ b/src/query/sql/src/planner/plans/ddl/table.rs @@ -243,6 +243,23 @@ impl RenameTablePlan { } } +/// Swap table names. +#[derive(Clone, Debug)] +pub struct SwapTablePlan { + pub tenant: Tenant, + pub if_exists: bool, + pub catalog: String, + pub database: String, + pub table: String, + pub target_table: String, +} + +impl SwapTablePlan { + pub fn schema(&self) -> DataSchemaRef { + Arc::new(DataSchema::empty()) + } +} + /// Modify table comment. #[derive(Clone, Debug)] pub struct ModifyTableCommentPlan { diff --git a/src/query/sql/src/planner/plans/plan.rs b/src/query/sql/src/planner/plans/plan.rs index ba7d698c0fb3b..343bbdfeb2a22 100644 --- a/src/query/sql/src/planner/plans/plan.rs +++ b/src/query/sql/src/planner/plans/plan.rs @@ -163,6 +163,7 @@ use crate::plans::ShowFileFormatsPlan; use crate::plans::ShowNetworkPoliciesPlan; use crate::plans::ShowTasksPlan; use crate::plans::SuspendWarehousePlan; +use crate::plans::SwapTablePlan; use crate::plans::SystemPlan; use crate::plans::TruncateTablePlan; use crate::plans::UnassignWarehouseNodesPlan; @@ -267,6 +268,7 @@ pub enum Plan { DropTable(Box), UndropTable(Box), RenameTable(Box), + SwapTable(Box), ModifyTableComment(Box), RenameTableColumn(Box), AddTableColumn(Box), diff --git a/src/query/storages/common/session/src/temp_table.rs b/src/query/storages/common/session/src/temp_table.rs index b626f7b7ec649..aa4b718521b82 100644 --- a/src/query/storages/common/session/src/temp_table.rs +++ b/src/query/storages/common/session/src/temp_table.rs @@ -30,6 +30,7 @@ use databend_common_meta_app::schema::GetTableCopiedFileReply; use databend_common_meta_app::schema::GetTableCopiedFileReq; use databend_common_meta_app::schema::RenameTableReply; use databend_common_meta_app::schema::RenameTableReq; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableCopiedFileInfo; use databend_common_meta_app::schema::TableIdent; use databend_common_meta_app::schema::TableInfo; @@ -206,6 +207,10 @@ impl TempTblMgr { } } + pub fn swap_table(&mut self, _req: &SwapTableReq) -> Result> { + Err(ErrorCode::Unimplemented("Cannot swap tmp table")) + } + pub fn get_table_meta_by_id(&self, id: u64) -> Result>> { Ok(self .id_to_table diff --git a/src/query/storages/hive/hive/src/hive_catalog.rs b/src/query/storages/hive/hive/src/hive_catalog.rs index 9f9d5b89224ff..a17f7326f9dde 100644 --- a/src/query/storages/hive/hive/src/hive_catalog.rs +++ b/src/query/storages/hive/hive/src/hive_catalog.rs @@ -82,6 +82,8 @@ use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq; use databend_common_meta_app::schema::SetTableRowAccessPolicyReply; use databend_common_meta_app::schema::SetTableRowAccessPolicyReq; +use databend_common_meta_app::schema::SwapTableReply; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::schema::TruncateTableReply; @@ -544,6 +546,11 @@ impl Catalog for HiveCatalog { )) } + #[async_backtrace::framed] + async fn swap_table(&self, _req: SwapTableReq) -> Result { + unimplemented!() + } + // Check a db.table is exists or not. #[async_backtrace::framed] async fn exists_table(&self, tenant: &Tenant, db_name: &str, table_name: &str) -> Result { diff --git a/src/query/storages/iceberg/src/catalog.rs b/src/query/storages/iceberg/src/catalog.rs index ec7be7e606c42..8b2f8fbc4ca06 100644 --- a/src/query/storages/iceberg/src/catalog.rs +++ b/src/query/storages/iceberg/src/catalog.rs @@ -85,6 +85,8 @@ use databend_common_meta_app::schema::SetTableColumnMaskPolicyReply; use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq; use databend_common_meta_app::schema::SetTableRowAccessPolicyReply; use databend_common_meta_app::schema::SetTableRowAccessPolicyReq; +use databend_common_meta_app::schema::SwapTableReply; +use databend_common_meta_app::schema::SwapTableReq; use databend_common_meta_app::schema::TableInfo; use databend_common_meta_app::schema::TableMeta; use databend_common_meta_app::schema::TruncateTableReply; @@ -543,6 +545,11 @@ impl Catalog for IcebergMutableCatalog { return Ok(RenameTableReply { table_id: 0 }); } + #[async_backtrace::framed] + async fn swap_table(&self, _req: SwapTableReq) -> Result { + unimplemented!() + } + #[async_backtrace::framed] async fn exists_table(&self, tenant: &Tenant, db_name: &str, table_name: &str) -> Result { let db = self.get_database(tenant, db_name).await?; diff --git a/tests/sqllogictests/suites/base/05_ddl/05_0059_ddl_alter_table_swap.test b/tests/sqllogictests/suites/base/05_ddl/05_0059_ddl_alter_table_swap.test new file mode 100644 index 0000000000000..949cd2f5844ce --- /dev/null +++ b/tests/sqllogictests/suites/base/05_ddl/05_0059_ddl_alter_table_swap.test @@ -0,0 +1,53 @@ +statement ok +CREATE or replace DATABASE swap_test_db; + +statement ok +USE swap_test_db; + +statement ok +CREATE TABLE table_a (id INT, name STRING); + +statement ok +CREATE TABLE table_b (id INT, description STRING, c3 STRING); + +statement ok +INSERT INTO table_a VALUES (1, 'Alice'), (2, 'Bob'); + +statement ok +INSERT INTO table_b VALUES (10, 'Test description 1', 'x'), (20, 'Test description 2', 'y'); + +query TT +SELECT * FROM table_a ORDER BY id; +---- +1 Alice +2 Bob + +query TTT +SELECT * FROM table_b ORDER BY id; +---- +10 Test description 1 x +20 Test description 2 y + +statement ok +ALTER TABLE table_a SWAP WITH table_b; + +query TTT +SELECT * FROM table_a ORDER BY id; +---- +10 Test description 1 x +20 Test description 2 y + +query TT +SELECT * FROM table_b ORDER BY id; +---- +1 Alice +2 Bob + +statement ok +DROP TABLE table_a; + +statement ok +DROP TABLE table_b; + +statement ok +DROP DATABASE swap_test_db;