Skip to content

Commit

Permalink
feat(wal): try to drop table & remove_ts_family when recover wal
Browse files Browse the repository at this point in the history
  • Loading branch information
zipper-meng committed Jul 14, 2023
1 parent 1c08691 commit 2aeed92
Show file tree
Hide file tree
Showing 9 changed files with 165 additions and 50 deletions.
7 changes: 6 additions & 1 deletion meta/src/bin/main.rs
Expand Up @@ -131,7 +131,12 @@ async fn detect_node_heartbeat(heartbeat_config: HeartBeatConfig, app: Data<Meta
interval.tick().await;

if let Ok(_leader) = app.raft.is_leader().await {
let opt_list = app.store.state_machine.write().await.children_data::<NodeMetrics>(&metrics_path);
let opt_list = app
.store
.state_machine
.write()
.await
.children_data::<NodeMetrics>(&metrics_path);

if let Ok(list) = opt_list {
let node_metrics_list: Vec<NodeMetrics> = list.into_values().collect();
Expand Down
3 changes: 3 additions & 0 deletions tskv/src/database.rs
Expand Up @@ -201,6 +201,9 @@ impl Database {
pub async fn del_tsfamily(&mut self, tf_id: u32, summary_task_sender: Sender<SummaryTask>) {
if let Some(tf) = self.ts_families.remove(&tf_id) {
tf.read().await.close();
} else {
// If no ts_family recovered from summary, do not write summary.
return;
}

let edits = vec![VersionEdit::new_del_vnode(tf_id)];
Expand Down
13 changes: 13 additions & 0 deletions tskv/src/engine_mock.rs
Expand Up @@ -55,6 +55,15 @@ impl Engine for MockEngine {
Ok(())
}

async fn remove_tsfamily_from_wal(
&self,
tenant: &str,
database: &str,
vnode_id: VnodeId,
) -> Result<()> {
Ok(())
}

async fn flush_tsfamily(&self, tenant: &str, database: &str, id: u32) -> Result<()> {
Ok(())
}
Expand Down Expand Up @@ -89,6 +98,10 @@ impl Engine for MockEngine {
Ok(())
}

async fn drop_table_from_wal(&self, tenant: &str, database: &str, table: &str) -> Result<()> {
Ok(())
}

async fn delete_series(
&self,
tenant: &str,
Expand Down
45 changes: 44 additions & 1 deletion tskv/src/kvcore.rs
Expand Up @@ -637,6 +637,15 @@ impl Engine for TsKv {
database::delete_table_async(tenant, database, table, version_set).await
}

async fn drop_table_from_wal(&self, tenant: &str, database: &str, table: &str) -> Result<()> {
let version_set = self.version_set.clone();
let database = database.to_string();
let table = table.to_string();
let tenant = tenant.to_string();

database::delete_table_async(tenant, database, table, version_set).await
}

async fn remove_tsfamily(&self, tenant: &str, database: &str, vnode_id: VnodeId) -> Result<()> {
if let Some(db) = self.version_set.read().await.get_db(tenant, database) {
// Store this action in WAL.
Expand Down Expand Up @@ -680,6 +689,40 @@ impl Engine for TsKv {
Ok(())
}

async fn remove_tsfamily_from_wal(
&self,
tenant: &str,
database: &str,
vnode_id: VnodeId,
) -> Result<()> {
if let Some(db) = self.version_set.read().await.get_db(tenant, database) {
let mut db_wlock = db.write().await;
db_wlock.del_ts_index(vnode_id);
db_wlock
.del_tsfamily(vnode_id, self.summary_task_sender.clone())
.await;

let ts_dir = self
.options
.storage
.ts_family_dir(&make_owner(tenant, database), vnode_id);
match std::fs::remove_dir_all(&ts_dir) {
Ok(()) => {
info!("Removed TsFamily directory '{}'", ts_dir.display());
}
Err(e) => {
error!(
"Failed to remove TsFamily directory '{}': {}",
ts_dir.display(),
e
);
}
}
}

Ok(())
}

async fn prepare_copy_vnode(
&self,
tenant: &str,
Expand Down Expand Up @@ -759,7 +802,7 @@ impl Engine for TsKv {
})?;
let column_id = schema
.column(column_name)
.ok_or_else(|| SchemaError::FiledNotFound {
.ok_or_else(|| SchemaError::FieldNotFound {
database: database.to_string(),
table: table.to_string(),
field: column_name.to_string(),
Expand Down
16 changes: 16 additions & 0 deletions tskv/src/lib.rs
Expand Up @@ -105,10 +105,26 @@ pub trait Engine: Send + Sync + Debug {
/// Delete all data of a table.
async fn drop_table(&self, tenant: &str, database: &str, table: &str) -> Result<()>;

/// Delete all data of a table.
///
/// Data is from the WAL(write-ahead-log), so won't write back to WAL.
async fn drop_table_from_wal(&self, tenant: &str, database: &str, table: &str) -> Result<()>;

/// Remove the storage unit(caches and files) managed by engine,
/// then remove directory of the storage unit.
async fn remove_tsfamily(&self, tenant: &str, database: &str, vnode_id: VnodeId) -> Result<()>;

/// Remove the storage unit(caches and files) managed by engine,
/// then remove directory of the storage unit.
///
/// Data is from the WAL(write-ahead-log), so won't write back to WAL.
async fn remove_tsfamily_from_wal(
&self,
tenant: &str,
database: &str,
vnode_id: VnodeId,
) -> Result<()>;

/// Mark the storage unit as `Copying` and flush caches.
async fn prepare_copy_vnode(
&self,
Expand Down
2 changes: 1 addition & 1 deletion tskv/src/schema/error.rs
Expand Up @@ -23,7 +23,7 @@ pub enum SchemaError {
},

#[snafu(display("field '{database}.{table}'.'{}' not found", field))]
FiledNotFound {
FieldNotFound {
database: String,
table: String,
field: String,
Expand Down
4 changes: 2 additions & 2 deletions tskv/src/schema/schemas.rs
Expand Up @@ -68,7 +68,7 @@ impl DBschemas {
});
}
} else {
return Err(SchemaError::FiledNotFound {
return Err(SchemaError::FieldNotFound {
database: self.database_name(),
table: table_name.to_string(),
field: field_name.to_string(),
Expand All @@ -85,7 +85,7 @@ impl DBschemas {
});
}
} else {
return Err(SchemaError::FiledNotFound {
return Err(SchemaError::FieldNotFound {
database: self.database_name(),
table: table_name.to_string(),
field: tag_name.to_string(),
Expand Down
99 changes: 68 additions & 31 deletions tskv/src/wal/mod.rs
Expand Up @@ -469,40 +469,77 @@ impl WalManager {
continue;
}
seq_gt_min_seq = true;
if let WalEntry::Write(blk) = wal_entry_blk.entry {
decoded_data.truncate(0);
decoder
.decode(blk.points(), &mut decoded_data)
.context(error::DecodeSnafu)?;
if decoded_data.is_empty() {
continue;
}
let vnode_id = blk.vnode_id();
if let Some(tsf_last_seq) = vnode_last_seq_map.get(&vnode_id) {
// If `seq_no` of TsFamily is greater than or equal to `seq`,
// it means that data was writen to tsm.
if *tsf_last_seq >= seq {
match wal_entry_blk.entry {
WalEntry::Write(blk) => {
decoded_data.truncate(0);
decoder
.decode(blk.points(), &mut decoded_data)
.context(error::DecodeSnafu)?;
if decoded_data.is_empty() {
continue;
}
let vnode_id = blk.vnode_id();
if let Some(tsf_last_seq) = vnode_last_seq_map.get(&vnode_id) {
// If `seq_no` of TsFamily is greater than or equal to `seq`,
// it means that data was writen to tsm.
if *tsf_last_seq >= seq {
continue;
}
}
let tenant =
unsafe { String::from_utf8_unchecked(blk.tenant().to_vec()) };
let precision = blk.precision();
let req = WritePointsRequest {
version: 1,
meta: Some(Meta {
tenant,
user: None,
password: None,
}),
points: decoded_data[0].to_vec(),
};
engine
.write_from_wal(vnode_id, precision, req, seq)
.await
.unwrap();
}

WalEntry::DeleteVnode(blk) => {
let vnode_id = blk.vnode_id();
let tenant =
unsafe { String::from_utf8_unchecked(blk.tenant().to_vec()) };
let database =
unsafe { String::from_utf8_unchecked(blk.database().to_vec()) };
trace::info!("Recover: delete vnode, tenant: {}, database: {}, vnode_id: {vnode_id}", &tenant, &database);
if let Err(e) = engine
.remove_tsfamily_from_wal(&tenant, &database, vnode_id)
.await
{
// Ignore delete vnode error.
trace::error!("Recover: failed to delete vnode, tenant: {tenant}, database: {database}, vnode_id: {vnode_id}, error: {e}");
}
}
WalEntry::DeleteTable(blk) => {
let tenant =
unsafe { String::from_utf8_unchecked(blk.tenant().to_vec()) };
let database =
unsafe { String::from_utf8_unchecked(blk.database().to_vec()) };
let table =
unsafe { String::from_utf8_unchecked(blk.table().to_vec()) };
trace::info!(
"Recover: delete table, tenant: {}, database: {}, table: {}",
&tenant,
&database,
&table
);
if let Err(e) =
engine.drop_table_from_wal(&tenant, &database, &table).await
{
// Ignore delete vnode error.
trace::error!("Recover: failed to delete table, tenant: {tenant}, database: {database}, table: {table}, error: {e}");
}
}
let tenant = unsafe { String::from_utf8_unchecked(blk.tenant().to_vec()) };
let precision = blk.precision();
let req = WritePointsRequest {
version: 1,
meta: Some(Meta {
tenant,
user: None,
password: None,
}),
points: decoded_data[0].to_vec(),
};
engine
.write_from_wal(vnode_id, precision, req, seq)
.await
.unwrap();
} else {
// WalEntry::DeleteVnode(_) => do nothing
// WalEntry::DeleteTable(_) => do nothing
_ => {}
}
}
Ok(None) | Err(Error::WalTruncated) => {
Expand Down
26 changes: 12 additions & 14 deletions tskv/tests/test_kvcore_interface.rs
Expand Up @@ -176,12 +176,10 @@ mod tests {
init_default_global_tracing("tskv_log", "tskv.log", "debug");
let (rt, tskv) = get_tskv("/tmp/test/kvcore/kvcore_flush_delta", None);
let mut fbb = flatbuffers::FlatBufferBuilder::new();
let points = models_helper::create_random_points_include_delta(
&mut fbb,
"db",
"kvcore_flush_delta",
20,
);
let database = "db_flush_delta";
let table = "kvcore_flush_delta";
let points =
models_helper::create_random_points_include_delta(&mut fbb, database, table, 20);
fbb.finish(points, None);
let points = fbb.finished_data().to_vec();
let request = kv_service::WritePointsRequest {
Expand Down Expand Up @@ -219,12 +217,12 @@ mod tests {
tokio::time::sleep(Duration::from_secs(3)).await;
});

assert!(file_manager::try_exists(
"/tmp/test/kvcore/kvcore_flush_delta/data/cnosdb.db/0/tsm"
));
assert!(file_manager::try_exists(
"/tmp/test/kvcore/kvcore_flush_delta/data/cnosdb.db/0/delta"
));
assert!(file_manager::try_exists(format!(
"/tmp/test/kvcore/kvcore_flush_delta/data/cnosdb.{database}/0/tsm"
)));
assert!(file_manager::try_exists(format!(
"/tmp/test/kvcore/kvcore_flush_delta/data/cnosdb.{database}/0/delta"
)));
}

#[tokio::test]
Expand All @@ -245,7 +243,7 @@ mod tests {
let mut fbb = flatbuffers::FlatBufferBuilder::new();
let points = models_helper::create_random_points_include_delta(
&mut fbb,
"db",
"db_build_row_data",
"kvcore_build_row_data",
20,
);
Expand Down Expand Up @@ -276,7 +274,7 @@ mod tests {

init_default_global_tracing(dir.join("log"), "tskv.log", "debug");
let tenant = "cnosdb";
let database = "db";
let database = "db_recover";
let table = "kvcore_recover";
let vnode_id = 10;

Expand Down

0 comments on commit 2aeed92

Please sign in to comment.