Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 175 additions & 0 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ use databend_common_meta_app::schema::SetTableColumnMaskPolicyAction;
use databend_common_meta_app::schema::SetTableColumnMaskPolicyReq;
use databend_common_meta_app::schema::SetTableRowAccessPolicyAction;
use databend_common_meta_app::schema::SetTableRowAccessPolicyReq;
use databend_common_meta_app::schema::SwapTableReq;
use databend_common_meta_app::schema::TableCopiedFileInfo;
use databend_common_meta_app::schema::TableCopiedFileNameIdent;
use databend_common_meta_app::schema::TableId;
Expand Down Expand Up @@ -309,6 +310,7 @@ impl SchemaApiTestSuite {
.drop_table_without_table_id_list(&b.build().await)
.await?;
suite.table_rename(&b.build().await).await?;
suite.table_swap(&b.build().await).await?;
suite.table_update_meta(&b.build().await).await?;
suite.table_update_mask_policy(&b.build().await).await?;
suite
Expand Down Expand Up @@ -2367,6 +2369,179 @@ impl SchemaApiTestSuite {
Ok(())
}

#[fastrace::trace]
async fn table_swap<MT: SchemaApi + kvapi::KVApi<Error = MetaError>>(
&self,
mt: &MT,
) -> anyhow::Result<()> {
let tenant_name = "tenant1";
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;

let db1_name = "db1";
let tb1_name = "table_a";
let tb2_name = "table_b";

let schema = || {
Arc::new(TableSchema::new(vec![TableField::new(
"number",
TableDataType::Number(NumberDataType::UInt64),
)]))
};

let _table_meta = |created_on| TableMeta {
schema: schema(),
engine: "JSON".to_string(),
options: maplit::btreemap! {"opt-1".into() => "val-1".into()},
created_on,
..TableMeta::default()
};

let swap_req = |if_exists| SwapTableReq {
if_exists,
origin_table: TableNameIdent {
tenant: tenant.clone(),
db_name: db1_name.to_string(),
table_name: tb1_name.to_string(),
},
target_table_name: tb2_name.to_string(),
};

info!("--- swap tables on unknown db");
{
let got = mt.swap_table(swap_req(false)).await;
debug!("--- swap tables on unknown database got: {:?}", got);

assert!(got.is_err());
assert_eq!(
ErrorCode::UNKNOWN_DATABASE,
ErrorCode::from(got.unwrap_err()).code()
);
}

info!("--- prepare db and tables");
let created_on = Utc::now();
let mut util1 = Util::new(mt, tenant_name, db1_name, tb1_name, "JSON");
util1.create_db().await?;

let mut util2 = Util::new(mt, tenant_name, db1_name, tb2_name, "JSON");

info!("--- create table_a");
let tb1_ident = {
let old_db = util1.get_database().await?;
let (table_id, _table_meta) = util1
.create_table_with(
|mut meta| {
meta.schema = schema();
meta.created_on = created_on;
meta
},
|req| req,
)
.await?;
let cur_db = util1.get_database().await?;
assert!(old_db.meta.seq < cur_db.meta.seq);
let got = util1.get_table().await?;
assert_eq!(table_id, got.ident.table_id);
got.ident
};

info!("--- create table_b");
let tb2_ident = {
let old_db = util2.get_database().await?;
let (table_id, _table_meta) = util2
.create_table_with(
|mut meta| {
meta.schema = schema();
meta.created_on = created_on;
meta
},
|req| req,
)
.await?;
let cur_db = util2.get_database().await?;
assert!(old_db.meta.seq < cur_db.meta.seq);
let got = util2.get_table().await?;
assert_eq!(table_id, got.ident.table_id);
got.ident
};

info!("--- swap tables, both exist, ok");
{
let old_db = util1.get_database().await?;
let _reply = mt.swap_table(swap_req(false)).await?;
let cur_db = util1.get_database().await?;
assert!(old_db.meta.seq < cur_db.meta.seq);

// Verify tables are swapped
let got_a = mt
.get_table((tenant_name, db1_name, tb1_name).into())
.await?;
let got_b = mt
.get_table((tenant_name, db1_name, tb2_name).into())
.await?;

// table_a name should now point to table_b's id
assert_eq!(tb2_ident.table_id, got_a.ident.table_id);
// table_b name should now point to table_a's id
assert_eq!(tb1_ident.table_id, got_b.ident.table_id);
}

info!("--- swap tables again, should restore original mapping");
{
let _reply = mt.swap_table(swap_req(false)).await?;

// Verify tables are swapped back
let got_a = mt
.get_table((tenant_name, db1_name, tb1_name).into())
.await?;
let got_b = mt
.get_table((tenant_name, db1_name, tb2_name).into())
.await?;

// table_a name should point back to table_a's id
assert_eq!(tb1_ident.table_id, got_a.ident.table_id);
// table_b name should point back to table_b's id
assert_eq!(tb2_ident.table_id, got_b.ident.table_id);
}

info!("--- swap non-existent table with if_exists=false, error");
{
let swap_req_nonexist = SwapTableReq {
if_exists: false,
origin_table: TableNameIdent {
tenant: tenant.clone(),
db_name: db1_name.to_string(),
table_name: "non_existent".to_string(),
},
target_table_name: tb2_name.to_string(),
};

let got = mt.swap_table(swap_req_nonexist).await;
assert!(got.is_err());
assert_eq!(
ErrorCode::UNKNOWN_TABLE,
ErrorCode::from(got.unwrap_err()).code()
);
}

info!("--- swap non-existent table with if_exists=true, ok");
{
let swap_req_nonexist = SwapTableReq {
if_exists: true,
origin_table: TableNameIdent {
tenant: tenant.clone(),
db_name: db1_name.to_string(),
table_name: "non_existent".to_string(),
},
target_table_name: tb2_name.to_string(),
};

assert!(mt.swap_table(swap_req_nonexist).await.is_ok());
}

Ok(())
}

#[fastrace::trace]
async fn table_update_meta<MT: SchemaApi + kvapi::KVApi<Error = MetaError>>(
&self,
Expand Down
188 changes: 188 additions & 0 deletions src/meta/api/src/table_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ use databend_common_meta_app::schema::ListDroppedTableResp;
use databend_common_meta_app::schema::ListTableReq;
use databend_common_meta_app::schema::RenameTableReply;
use databend_common_meta_app::schema::RenameTableReq;
use databend_common_meta_app::schema::SwapTableReply;
use databend_common_meta_app::schema::SwapTableReq;
use databend_common_meta_app::schema::TableCopiedFileNameIdent;
use databend_common_meta_app::schema::TableId;
use databend_common_meta_app::schema::TableIdHistoryIdent;
Expand Down Expand Up @@ -713,6 +715,192 @@ where
}
}

#[logcall::logcall]
#[fastrace::trace]
async fn swap_table(&self, req: SwapTableReq) -> Result<SwapTableReply, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());

let mut trials = txn_backoff(None, func_name!());
loop {
trials.next().unwrap()?.await;

// Get databases
let tenant_dbname_left = req.origin_table.db_name_ident();

let (seq_db_id_left, db_meta_left) =
get_db_or_err(self, &tenant_dbname_left, "swap_table: tenant_dbname_left").await?;

let dbid_tbname_left = DBIdTableName {
db_id: *seq_db_id_left.data,
table_name: req.origin_table.table_name.clone(),
};

let (tb_id_seq_left, table_id_left) = get_u64_value(self, &dbid_tbname_left).await?;
if req.if_exists && tb_id_seq_left == 0 {
return Ok(SwapTableReply {});
}
assert_table_exist(
tb_id_seq_left,
&req.origin_table,
"swap_table: origin_table",
)?;

let dbid_tbname_right = DBIdTableName {
db_id: *seq_db_id_left.data,
table_name: req.target_table_name.clone(),
};

let (tb_id_seq_right, table_id_right) = get_u64_value(self, &dbid_tbname_right).await?;
if req.if_exists && tb_id_seq_right == 0 {
return Ok(SwapTableReply {});
}
assert_table_exist(
tb_id_seq_right,
&TableNameIdent {
tenant: req.origin_table.tenant.clone(),
db_name: req.origin_table.db_name.clone(),
table_name: req.target_table_name.clone(),
},
"swap_table: target_table",
)?;

// Get table id lists
let dbid_tbname_idlist_left = TableIdHistoryIdent {
database_id: *seq_db_id_left.data,
table_name: req.origin_table.table_name.clone(),
};
let dbid_tbname_idlist_right = TableIdHistoryIdent {
database_id: *seq_db_id_left.data,
table_name: req.target_table_name.clone(),
};

let seq_table_history_left = self.get_pb(&dbid_tbname_idlist_left).await?;
let seq_table_history_right = self.get_pb(&dbid_tbname_idlist_right).await?;

let tb_id_list_seq_left = seq_table_history_left.seq();
let tb_id_list_seq_right = seq_table_history_right.seq();

let mut tb_id_list_left = seq_table_history_left
.into_value()
.unwrap_or_else(|| TableIdList::new_with_ids([table_id_left]));
let mut tb_id_list_right = seq_table_history_right
.into_value()
.unwrap_or_else(|| TableIdList::new_with_ids([table_id_right]));

// Validate table IDs in history lists
{
let last_left = tb_id_list_left.last().copied();
if Some(table_id_left) != last_left {
let err_message = format!(
"swap_table {:?} but last table id conflict, id list last: {:?}, current: {}",
req.origin_table, last_left, table_id_left
);
error!("{}", err_message);
return Err(KVAppError::AppError(AppError::UnknownTable(
UnknownTable::new(&req.origin_table.table_name, err_message),
)));
}

let last_right = tb_id_list_right.last().copied();
if Some(table_id_right) != last_right {
let err_message = format!(
"swap_table {:?} but last table id conflict, id list last: {:?}, current: {}",
req.target_table_name, last_right, table_id_right
);
error!("{}", err_message);
return Err(KVAppError::AppError(AppError::UnknownTable(
UnknownTable::new(&req.target_table_name, err_message),
)));
}
}

// Get table id to name mappings
let table_id_to_name_key_left = TableIdToName {
table_id: table_id_left,
};
let table_id_to_name_key_right = TableIdToName {
table_id: table_id_right,
};
let table_id_to_name_seq_left = self.get_seq(&table_id_to_name_key_left).await?;
let table_id_to_name_seq_right = self.get_seq(&table_id_to_name_key_right).await?;

// Prepare new mappings after swap
let db_id_table_name_left = DBIdTableName {
db_id: *seq_db_id_left.data,
table_name: req.origin_table.table_name.clone(),
};
let db_id_table_name_right = DBIdTableName {
db_id: *seq_db_id_left.data,
table_name: req.target_table_name.clone(),
};

{
// Update history lists: remove current table IDs
tb_id_list_left.pop();
tb_id_list_right.pop();
// Add swapped table IDs
tb_id_list_left.append(table_id_right);
tb_id_list_right.append(table_id_left);

let txn = TxnRequest::new(
vec![
// Ensure databases haven't changed
txn_cond_seq(&seq_db_id_left.data, Eq, db_meta_left.seq),
// Ensure table name->table_id mappings haven't changed
txn_cond_seq(&dbid_tbname_left, Eq, tb_id_seq_left),
txn_cond_seq(&dbid_tbname_right, Eq, tb_id_seq_right),
// Ensure table history lists haven't changed
txn_cond_seq(&dbid_tbname_idlist_left, Eq, tb_id_list_seq_left),
txn_cond_seq(&dbid_tbname_idlist_right, Eq, tb_id_list_seq_right),
// Ensure table_id->name mappings haven't changed
txn_cond_seq(&table_id_to_name_key_left, Eq, table_id_to_name_seq_left),
txn_cond_seq(&table_id_to_name_key_right, Eq, table_id_to_name_seq_right),
],
vec![
// Swap table name->table_id mappings
txn_op_put(&dbid_tbname_left, serialize_u64(table_id_right)?), /* origin_table_name -> target_table_id */
txn_op_put(&dbid_tbname_right, serialize_u64(table_id_left)?), /* target_table_name -> origin_table_id */
// Update database metadata sequences
txn_op_put(&seq_db_id_left.data, serialize_struct(&*db_meta_left)?),
// Update table history lists
txn_op_put(
&dbid_tbname_idlist_left,
serialize_struct(&tb_id_list_left)?,
),
txn_op_put(
&dbid_tbname_idlist_right,
serialize_struct(&tb_id_list_right)?,
),
// Update table_id->name mappings
txn_op_put(
&table_id_to_name_key_left,
serialize_struct(&db_id_table_name_right)?,
), // origin_table_id -> target_table_name
txn_op_put(
&table_id_to_name_key_right,
serialize_struct(&db_id_table_name_left)?,
), // target_table_id -> origin_table_name
],
);

let (succ, _responses) = send_txn(self, txn).await?;

debug!(
origin_table :? =(&req.origin_table),
target_table_name :? =(&req.target_table_name),
table_id_left :? =(&table_id_left),
table_id_right :? =(&table_id_right),
succ = succ;
"swap_table"
);

if succ {
return Ok(SwapTableReply {});
}
}
}
}

#[logcall::logcall]
#[fastrace::trace]
async fn truncate_table(
Expand Down
Loading
Loading