Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions src/meta/api/src/garbage_collection_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,10 @@ use display_more::DisplaySliceExt;
use fastrace::func_name;
use futures::StreamExt;
use futures::TryStreamExt;
use log::debug;
use log::error;
use log::info;
use log::warn;

use crate::index_api::IndexApi;
use crate::kv_app_error::KVAppError;
Expand Down Expand Up @@ -178,6 +180,11 @@ pub async fn get_history_tables_for_gc(
db_id: u64,
limit: usize,
) -> Result<Vec<TableNIV>, KVAppError> {
info!(
"get_history_tables_for_gc: db_id {}, limit {}",
db_id, limit
);

let ident = TableIdHistoryIdent {
database_id: db_id,
table_name: "dummy".to_string(),
Expand All @@ -196,6 +203,15 @@ pub async fn get_history_tables_for_gc(
let mut filter_tb_infos = vec![];
const BATCH_SIZE: usize = 1000;

let args_len = args.len();
let mut num_out_of_time_range = 0;
let mut num_processed = 0;

info!(
"get_history_tables_for_gc: {} items to process in db {}",
args_len, db_id
);

// Process in batches to avoid performance issues
for chunk in args.chunks(BATCH_SIZE) {
// Get table metadata for current batch
Expand All @@ -205,15 +221,16 @@ pub async fn get_history_tables_for_gc(
// Filter by drop_time_range for current batch
for (seq_meta, (table_id, table_name)) in seq_metas.into_iter().zip(chunk.iter()) {
let Some(seq_meta) = seq_meta else {
error!(
warn!(
"batch_filter_table_info cannot find {:?} table_meta",
table_id
);
continue;
};

if !drop_time_range.contains(&seq_meta.data.drop_on) {
info!("table {:?} is not in drop_time_range", seq_meta.data);
debug!("table {:?} is not in drop_time_range", seq_meta.data);
num_out_of_time_range += 1;
continue;
}

Expand All @@ -225,9 +242,20 @@ pub async fn get_history_tables_for_gc(

// Check if we have reached the limit
if filter_tb_infos.len() >= limit {
info!(
"get_history_tables_for_gc: reach limit {}, so far collected {}",
limit,
filter_tb_infos.len()
);
return Ok(filter_tb_infos);
}
}

num_processed += chunk.len();
info!(
"get_history_tables_for_gc: process: {}/{}, {} items filtered by time range condition",
num_processed, args_len, num_out_of_time_range
);
}

Ok(filter_tb_infos)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,13 @@ pub async fn vacuum_drop_tables_by_table_info(
}
};

let (_, failed_tables) = &result;
let (success_count, failed_count) = (num_tables - failed_tables.len(), failed_tables.len());
info!(
"vacuum {} dropped tables, cost:{:?}",
"vacuum {} dropped tables completed - success: {}, failed: {}, total_cost: {:?}",
num_tables,
success_count,
failed_count,
start.elapsed()
);

Expand Down
22 changes: 14 additions & 8 deletions src/query/ee/src/storages/fuse/operations/vacuum_table_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ pub async fn do_vacuum2(
}

let fuse_table = FuseTable::try_from_table(table)?;

let Some(latest_snapshot) = fuse_table.read_table_snapshot().await? else {
info!(
"[FUSE-VACUUM2] Table {} has no snapshot, stopping vacuum",
fuse_table.get_table_info().desc
);
return Ok(vec![]);
};

let start = std::time::Instant::now();

let retention_policy = fuse_table.get_data_retention_policy(ctx.as_ref())?;
Expand All @@ -122,7 +131,9 @@ pub async fn do_vacuum2(
// A zero retention period indicates that we should vacuum all the historical snapshots
is_vacuum_all = retention_period.is_zero();

let Some(lvt) = set_lvt(fuse_table, ctx.as_ref(), retention_period).await? else {
let Some(lvt) =
set_lvt(fuse_table, latest_snapshot, ctx.as_ref(), retention_period).await?
else {
return Ok(vec![]);
};

Expand Down Expand Up @@ -153,6 +164,7 @@ pub async fn do_vacuum2(
fuse_table
.meta_location_generator()
.snapshot_location_prefix(),
// Safe to unwrap here: we have checked that `fuse_table` has a snapshot
fuse_table.snapshot_loc().unwrap().as_str(),
need_one_more,
None,
Expand Down Expand Up @@ -433,16 +445,10 @@ async fn collect_gc_candidates_by_retention_period(
/// Return `None` means we stop vacuumming, but don't want to report error to user.
async fn set_lvt(
fuse_table: &FuseTable,
latest_snapshot: Arc<TableSnapshot>,
ctx: &dyn TableContext,
retention_period: TimeDelta,
) -> Result<Option<DateTime<Utc>>> {
let Some(latest_snapshot) = fuse_table.read_table_snapshot().await? else {
info!(
"[FUSE-VACUUM2] Table {} has no snapshot, stopping vacuum",
fuse_table.get_table_info().desc
);
return Ok(None);
};
if !is_uuid_v7(&latest_snapshot.snapshot_id) {
info!(
"[FUSE-VACUUM2] Latest snapshot is not v7, stopping vacuum: {:?}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl VacuumDropTablesInterpreter {
drop_ids: Vec<DroppedId>,
) -> Result<()> {
info!(
"vacuum drop table from db {:?}, gc_drop_tables",
"vacuum metadata of dropped table from db {:?}",
self.plan.database,
);

Expand All @@ -75,6 +75,12 @@ impl VacuumDropTablesInterpreter {
}
}

info!(
"found {} database meta data and {} table metadata need to be cleaned",
drop_db_ids.len(),
drop_db_table_ids.len()
);

let chunk_size = 50;

// first gc drop table ids
Expand Down Expand Up @@ -124,8 +130,10 @@ impl Interpreter for VacuumDropTablesInterpreter {
let retention_time = chrono::Utc::now() - duration;
let catalog = self.ctx.get_catalog(self.plan.catalog.as_str()).await?;
info!(
"vacuum drop table from db {:?}, duration: {:?}, retention_time: {:?}",
self.plan.database, duration, retention_time
"=== VACUUM DROP TABLE STARTED === db: {:?}, retention_days: {}, retention_time: {:?}",
self.plan.database,
ctx.get_settings().get_data_retention_time_in_days()?,
retention_time
);
// if database if empty, vacuum all tables
let database_name = if self.plan.database.is_empty() {
Expand Down Expand Up @@ -153,13 +161,18 @@ impl Interpreter for VacuumDropTablesInterpreter {
}

info!(
"vacuum drop table from db {:?}, get_drop_table_infos return tables: {:?},tables.len: {:?}, drop_ids: {:?}",
"vacuum drop table from db {:?}, found {} tables: [{}], drop_ids: {:?}",
self.plan.database,
tables.len(),
tables
.iter()
.map(|t| t.get_table_info())
.collect::<Vec<_>>(),
tables.len(),
.map(|t| format!(
"{}(id:{})",
t.get_table_info().name,
t.get_table_info().ident.table_id
))
.collect::<Vec<_>>()
.join(", "),
drop_ids
);

Expand All @@ -176,12 +189,17 @@ impl Interpreter for VacuumDropTablesInterpreter {
}

info!(
"after filter read-only tables: {:?}, tables.len: {:?}",
"after filter read-only tables: {} tables remain: [{}]",
tables.len(),
tables
.iter()
.map(|t| t.get_table_info())
.collect::<Vec<_>>(),
tables.len()
.map(|t| format!(
"{}(id:{})",
t.get_table_info().name,
t.get_table_info().ident.table_id
))
.collect::<Vec<_>>()
.join(", ")
);

let tables_count = tables.len();
Expand Down Expand Up @@ -226,17 +244,30 @@ impl Interpreter for VacuumDropTablesInterpreter {
}
}
info!(
"failed dbs:{:?}, failed_tables:{:?}, success_drop_ids:{:?}",
failed_db_ids, failed_tables, success_dropped_ids
"vacuum drop table summary - failed dbs: {}, failed tables: {}, successfully cleaned: {} items",
failed_db_ids.len(),
failed_tables.len(),
success_dropped_ids.len()
);
if !failed_tables.is_empty() {
info!("failed table ids: {:?}", failed_tables);
}

self.gc_drop_tables(catalog, success_dropped_ids).await?;
}

let success_count = tables_count as u64 - failed_tables.len() as u64;
let failed_count = failed_tables.len() as u64;

info!(
"=== VACUUM DROP TABLE COMPLETED === success: {}, failed: {}, total: {}",
success_count, failed_count, tables_count
);

match files_opt {
None => PipelineBuildResult::from_blocks(vec![DataBlock::new_from_columns(vec![
UInt64Type::from_data(vec![tables_count as u64 - failed_tables.len() as u64]),
UInt64Type::from_data(vec![failed_tables.len() as u64]),
UInt64Type::from_data(vec![success_count]),
UInt64Type::from_data(vec![failed_count]),
])]),
Some(purge_files) => {
let mut len = min(purge_files.len(), DRY_RUN_LIMIT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
statement ok
create or replace database issue_18743;

statement ok
use issue_18743;

statement ok
CREATE OR REPLACE TABLE t(c int);

statement ok
call system$fuse_vacuum2('issue_18743', 't');

statement ok
call system$fuse_vacuum2();