Skip to content

Commit

Permalink
feat(wal): try to drop table & remove_ts_family when recover wal
Browse files Browse the repository at this point in the history
  • Loading branch information
zipper-meng committed Jul 10, 2023
1 parent 10ceb47 commit 0499afd
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 31 deletions.
3 changes: 3 additions & 0 deletions tskv/src/database.rs
Expand Up @@ -201,6 +201,9 @@ impl Database {
pub async fn del_tsfamily(&mut self, tf_id: u32, summary_task_sender: Sender<SummaryTask>) {
if let Some(tf) = self.ts_families.remove(&tf_id) {
tf.read().await.close();
} else {
// If no ts_family recovered from summary, do not write summary.
return;
}

let edits = vec![VersionEdit::new_del_vnode(tf_id)];
Expand Down
13 changes: 13 additions & 0 deletions tskv/src/engine_mock.rs
Expand Up @@ -55,6 +55,15 @@ impl Engine for MockEngine {
Ok(())
}

async fn remove_tsfamily_from_wal(
&self,
tenant: &str,
database: &str,
vnode_id: VnodeId,
) -> Result<()> {
Ok(())
}

async fn flush_tsfamily(&self, tenant: &str, database: &str, id: u32) -> Result<()> {
Ok(())
}
Expand Down Expand Up @@ -89,6 +98,10 @@ impl Engine for MockEngine {
Ok(())
}

async fn drop_table_from_wal(&self, tenant: &str, database: &str, table: &str) -> Result<()> {
Ok(())
}

async fn delete_series(
&self,
tenant: &str,
Expand Down
43 changes: 43 additions & 0 deletions tskv/src/kvcore.rs
Expand Up @@ -637,6 +637,15 @@ impl Engine for TsKv {
database::delete_table_async(tenant, database, table, version_set).await
}

async fn drop_table_from_wal(&self, tenant: &str, database: &str, table: &str) -> Result<()> {
let version_set = self.version_set.clone();
let database = database.to_string();
let table = table.to_string();
let tenant = tenant.to_string();

database::delete_table_async(tenant, database, table, version_set).await
}

async fn remove_tsfamily(&self, tenant: &str, database: &str, vnode_id: VnodeId) -> Result<()> {
if let Some(db) = self.version_set.read().await.get_db(tenant, database) {
// Store this action in WAL.
Expand Down Expand Up @@ -680,6 +689,40 @@ impl Engine for TsKv {
Ok(())
}

async fn remove_tsfamily_from_wal(
&self,
tenant: &str,
database: &str,
vnode_id: VnodeId,
) -> Result<()> {
if let Some(db) = self.version_set.read().await.get_db(tenant, database) {
let mut db_wlock = db.write().await;
db_wlock.del_ts_index(vnode_id);
db_wlock
.del_tsfamily(vnode_id, self.summary_task_sender.clone())
.await;

let ts_dir = self
.options
.storage
.ts_family_dir(&make_owner(tenant, database), vnode_id);
match std::fs::remove_dir_all(&ts_dir) {
Ok(()) => {
info!("Removed TsFamily directory '{}'", ts_dir.display());
}
Err(e) => {
error!(
"Failed to remove TsFamily directory '{}': {}",
ts_dir.display(),
e
);
}
}
}

Ok(())
}

async fn prepare_copy_vnode(
&self,
tenant: &str,
Expand Down
16 changes: 16 additions & 0 deletions tskv/src/lib.rs
Expand Up @@ -105,10 +105,26 @@ pub trait Engine: Send + Sync + Debug {
/// Delete all data of a table.
async fn drop_table(&self, tenant: &str, database: &str, table: &str) -> Result<()>;

/// Delete all data of a table.
///
/// Data is from the WAL(write-ahead-log), so won't write back to WAL.
async fn drop_table_from_wal(&self, tenant: &str, database: &str, table: &str) -> Result<()>;

/// Remove the storage unit(caches and files) managed by engine,
/// then remove directory of the storage unit.
async fn remove_tsfamily(&self, tenant: &str, database: &str, vnode_id: VnodeId) -> Result<()>;

/// Remove the storage unit(caches and files) managed by engine,
/// then remove directory of the storage unit.
///
/// Data is from the WAL(write-ahead-log), so won't write back to WAL.
async fn remove_tsfamily_from_wal(
&self,
tenant: &str,
database: &str,
vnode_id: VnodeId,
) -> Result<()>;

/// Mark the storage unit as `Copying` and flush caches.
async fn prepare_copy_vnode(
&self,
Expand Down
99 changes: 68 additions & 31 deletions tskv/src/wal/mod.rs
Expand Up @@ -469,40 +469,77 @@ impl WalManager {
continue;
}
seq_gt_min_seq = true;
if let WalEntry::Write(blk) = wal_entry_blk.entry {
decoded_data.truncate(0);
decoder
.decode(blk.points(), &mut decoded_data)
.context(error::DecodeSnafu)?;
if decoded_data.is_empty() {
continue;
}
let vnode_id = blk.vnode_id();
if let Some(tsf_last_seq) = vnode_last_seq_map.get(&vnode_id) {
// If `seq_no` of TsFamily is greater than or equal to `seq`,
// it means that data was writen to tsm.
if *tsf_last_seq >= seq {
match wal_entry_blk.entry {
WalEntry::Write(blk) => {
decoded_data.truncate(0);
decoder
.decode(blk.points(), &mut decoded_data)
.context(error::DecodeSnafu)?;
if decoded_data.is_empty() {
continue;
}
let vnode_id = blk.vnode_id();
if let Some(tsf_last_seq) = vnode_last_seq_map.get(&vnode_id) {
// If `seq_no` of TsFamily is greater than or equal to `seq`,
// it means that data was writen to tsm.
if *tsf_last_seq >= seq {
continue;
}
}
let tenant =
unsafe { String::from_utf8_unchecked(blk.tenant().to_vec()) };
let precision = blk.precision();
let req = WritePointsRequest {
version: 1,
meta: Some(Meta {
tenant,
user: None,
password: None,
}),
points: decoded_data[0].to_vec(),
};
engine
.write_from_wal(vnode_id, precision, req, seq)
.await
.unwrap();
}

WalEntry::DeleteVnode(blk) => {
let vnode_id = blk.vnode_id();
let tenant =
unsafe { String::from_utf8_unchecked(blk.tenant().to_vec()) };
let database =
unsafe { String::from_utf8_unchecked(blk.database().to_vec()) };
trace::info!("Recover: delete vnode, tenant: {}, database: {}, vnode_id: {vnode_id}", &tenant, &database);
if let Err(e) = engine
.remove_tsfamily_from_wal(&tenant, &database, vnode_id)
.await
{
// Ignore delete vnode error.
trace::error!("Recover: failed to delete vnode, tenant: {tenant}, database: {database}, vnode_id: {vnode_id}, error: {e}");
}
}
WalEntry::DeleteTable(blk) => {
let tenant =
unsafe { String::from_utf8_unchecked(blk.tenant().to_vec()) };
let database =
unsafe { String::from_utf8_unchecked(blk.database().to_vec()) };
let table =
unsafe { String::from_utf8_unchecked(blk.table().to_vec()) };
trace::info!(
"Recover: delete table, tenant: {}, database: {}, table: {}",
&tenant,
&database,
&table
);
if let Err(e) =
engine.drop_table_from_wal(&tenant, &database, &table).await
{
// Ignore delete vnode error.
trace::error!("Recover: failed to delete table, tenant: {tenant}, database: {database}, table: {table}, error: {e}");
}
}
let tenant = unsafe { String::from_utf8_unchecked(blk.tenant().to_vec()) };
let precision = blk.precision();
let req = WritePointsRequest {
version: 1,
meta: Some(Meta {
tenant,
user: None,
password: None,
}),
points: decoded_data[0].to_vec(),
};
engine
.write_from_wal(vnode_id, precision, req, seq)
.await
.unwrap();
} else {
// WalEntry::DeleteVnode(_) => do nothing
// WalEntry::DeleteTable(_) => do nothing
_ => {}
}
}
Ok(None) | Err(Error::WalTruncated) => {
Expand Down

0 comments on commit 0499afd

Please sign in to comment.