Skip to content

Commit

Permalink
fix : restart lose data
Browse files Browse the repository at this point in the history
  • Loading branch information
Subsegment committed Sep 26, 2023
1 parent 8385020 commit b5ef7aa
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 84 deletions.
213 changes: 133 additions & 80 deletions tskv/src/kvcore.rs
@@ -1,4 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::ops::Deref;
use std::panic;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -196,64 +197,70 @@ impl TsKv {
let min_log_seq = self.global_seq_ctx.min_seq();
let wal_readers = wal_manager.recover().await;
let mut recover_task = vec![];
for mut reader in wal_readers {
trace::info!(
"Recover: reading wal '{}' for seq {} to {}",
reader.path().display(),
reader.min_sequence(),
reader.max_sequence(),
);
if reader.is_empty() {
continue;
}
let vnode_last_seq_map = self.global_seq_ctx.clone();
for (vnode_id, readers) in wal_readers {
let task = async move {
let mut decoder = WalDecoder::new();
loop {
match reader.next_wal_entry().await {
Ok(Some(wal_entry_blk)) => {
let seq = wal_entry_blk.seq;
if seq < min_log_seq {
continue;
}
match wal_entry_blk.entry {
WalEntry::Write(blk) => {
let vnode_id = blk.vnode_id();
if vnode_last_seq_map.vnode_min_seq(vnode_id) >= seq {
// If `seq_no` of TsFamily is greater than or equal to `seq`,
// it means that data was writen to tsm.
continue;
}

self.write_from_wal(vnode_id, seq, &blk, &mut decoder)
.await
.unwrap();
for mut reader in readers {
info!(
"Recover: reading wal '{}' for seq {} to {}",
reader.path().display(),
reader.min_sequence(),
reader.max_sequence(),
);
if reader.is_empty() {
continue;
}
let vnode_last_seq_map = self.global_seq_ctx.clone();
let mut decoder = WalDecoder::new();
loop {
match reader.next_wal_entry().await {
Ok(Some(wal_entry_blk)) => {
let seq = wal_entry_blk.seq;
if seq < min_log_seq {
continue;
}
match wal_entry_blk.entry {
WalEntry::Write(blk) => {
error!("write seq: {}", seq);
let vnode_id = blk.vnode_id();
if vnode_last_seq_map.vnode_min_seq(vnode_id) >= seq {
// If `seq_no` of TsFamily is greater than or equal to `seq`,
// it means that data was writen to tsm.
continue;
}

self.write_from_wal(vnode_id, seq, &blk, &mut decoder)
.await
.unwrap();
}

WalEntry::DeleteVnode(blk) => {
if let Err(e) = self.remove_tsfamily_from_wal(&blk).await {
// Ignore delete vnode error.
trace::error!("Recover: failed to delete vnode: {e}");
WalEntry::DeleteVnode(blk) => {
if let Err(e) = self.remove_tsfamily_from_wal(&blk).await {
// Ignore delete vnode error.
trace::error!("Recover: failed to delete vnode: {e}");
}
}
}
WalEntry::DeleteTable(blk) => {
if let Err(e) = self.drop_table_from_wal(&blk).await {
// Ignore delete table error.
trace::error!("Recover: failed to delete table: {e}");
WalEntry::DeleteTable(blk) => {
error!("delete table seq: {}", seq);
if let Err(e) =
self.drop_table_from_wal(&blk, vnode_id).await
{
// Ignore delete table error.
trace::error!("Recover: failed to delete table: {e}");
}
}
_ => {}
}
_ => {}
}
}
Ok(None) | Err(Error::WalTruncated) => {
break;
}
Err(e) => {
panic!(
"Failed to recover from {}: {:?}",
reader.path().display(),
e
);
Ok(None) | Err(Error::WalTruncated) => {
break;
}
Err(e) => {
panic!(
"Failed to recover from {}: {:?}",
reader.path().display(),
e
);
}
}
}
}
Expand Down Expand Up @@ -490,7 +497,12 @@ impl TsKv {
}
}

async fn delete_table(&self, database: Arc<RwLock<Database>>, table: &str) -> Result<()> {
async fn delete_table(
&self,
database: Arc<RwLock<Database>>,
table: &str,
vnode_id: Option<VnodeId>,
) -> Result<()> {
// TODO Create global DropTable flag for droping the same table at the same time.
let db_rlock = database.read().await;
let db_owner = db_rlock.owner();
Expand All @@ -507,38 +519,75 @@ impl TsKv {
min_ts: Timestamp::MIN,
max_ts: Timestamp::MAX,
};
for (ts_family_id, ts_family) in database.read().await.ts_families().iter() {
// TODO: Concurrent delete on ts_family.
// TODO: Limit parallel delete to 1.
if let Some(ts_index) = db_rlock.get_ts_index(*ts_family_id) {
let series_ids = ts_index.get_series_id_list(table, &[]).await?;
ts_family
.write()
.await
.delete_series(&series_ids, time_range);

let field_ids: Vec<u64> = series_ids
.iter()
.flat_map(|sid| column_ids.iter().map(|fid| unite_id(*fid, *sid)))
.collect();
info!(
"Drop table: vnode {ts_family_id} deleting {} fields in table: {db_owner}.{table}",
field_ids.len()
);

let version = ts_family.read().await.super_version();
for column_file in version.version.column_files(&field_ids, time_range) {
column_file.add_tombstone(&field_ids, time_range).await?;
}
} else {
continue;
if let Some(vnode_id) = vnode_id {
if let Some(ts_family) = db_rlock.ts_families().get(&vnode_id) {
self.tsf_delete_table(
&db_rlock,
vnode_id,
ts_family.clone(),
table,
time_range,
&column_ids,
)
.await?;
}
} else {
for (ts_family_id, ts_family) in database.read().await.ts_families().iter() {
// TODO: Concurrent delete on ts_family.
// TODO: Limit parallel delete to 1.
self.tsf_delete_table(
&db_rlock,
*ts_family_id,
ts_family.clone(),
table,
time_range,
&column_ids,
)
.await?;
}
}
}

Ok(())
}

async fn tsf_delete_table<Db>(
&self,
db: &Db,
ts_family_id: TseriesFamilyId,
ts_family: Arc<RwLock<TseriesFamily>>,
table: &str,
time_range: &TimeRange,
column_ids: &[ColumnId],
) -> Result<()>
where
Db: Deref<Target = Database>,
{
let db_owner = db.owner();
if let Some(ts_index) = db.get_ts_index(ts_family_id) {
let series_ids = ts_index.get_series_id_list(table, &[]).await?;
ts_family
.write()
.await
.delete_series(&series_ids, time_range);

let field_ids: Vec<u64> = series_ids
.iter()
.flat_map(|sid| column_ids.iter().map(|fid| unite_id(*fid, *sid)))
.collect();
info!(
"Drop table: vnode {ts_family_id} deleting {} fields in table: {db_owner}.{table}",
field_ids.len()
);

let version = ts_family.read().await.super_version();
for column_file in version.version.column_files(&field_ids, time_range) {
column_file.add_tombstone(&field_ids, time_range).await?;
}
}
Ok(())
}

async fn drop_columns(
&self,
database: Arc<RwLock<Database>>,
Expand Down Expand Up @@ -694,7 +743,11 @@ impl TsKv {
/// Delete all data of a table.
///
/// Data is from the WAL(write-ahead-log), so won't write back to WAL.
async fn drop_table_from_wal(&self, block: &wal::DeleteTableBlock) -> Result<()> {
async fn drop_table_from_wal(
&self,
block: &wal::DeleteTableBlock,
vnode_id: VnodeId,
) -> Result<()> {
let tenant = block.tenant_utf8()?;
let database = block.database_utf8()?;
let table = block.table_utf8()?;
Expand All @@ -703,7 +756,7 @@ impl TsKv {
&tenant, &database, &table
);
if let Some(db) = self.version_set.read().await.get_db(tenant, database) {
return self.delete_table(db, table).await;
return self.delete_table(db, table, Some(vnode_id)).await;
}
Ok(())
}
Expand Down Expand Up @@ -861,7 +914,7 @@ impl Engine for TsKv {
source: error::ChannelReceiveError::WriteWalResult { source: e },
})??;

return self.delete_table(db, table).await;
return self.delete_table(db, table, None).await;
}

Ok(())
Expand Down
8 changes: 4 additions & 4 deletions tskv/src/wal/mod.rs
Expand Up @@ -35,7 +35,7 @@
//! +------------+---------------+--------------+--------------+
//! ```

use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use std::fmt::Display;
use std::path::PathBuf;
use std::sync::Arc;
Expand Down Expand Up @@ -636,12 +636,12 @@ impl WalManager {
}
Ok(())
}
pub async fn recover(&self) -> Vec<WalReader> {
let mut recover_task = vec![];
pub async fn recover(&self) -> BTreeMap<VnodeId, Vec<WalReader>> {
let mut recover_task = BTreeMap::new();
for (vnode_id, vnode_wal) in self.wal_set.iter() {
let min_seq = self.global_seq_ctx.vnode_min_seq(*vnode_id);
if let Ok(readers) = vnode_wal.recover(min_seq).await {
recover_task.extend(readers);
recover_task.insert(*vnode_id, readers);
} else {
panic!("Failed to recover wal for vnode '{}'", vnode_id)
}
Expand Down

0 comments on commit b5ef7aa

Please sign in to comment.